onEventWithHistory method
Returns the stream for a given stream id which includes historical events.
If stream
does not have event history collected, a parameter error is
sent over the returned Stream.
Implementation
Stream<Event> onEventWithHistory(String stream) {
late StreamController<Event> controller;
late StreamQueue<Event> streamEvents;
controller = StreamController<Event>(onListen: () async {
streamEvents = StreamQueue<Event>(onEvent(stream));
final history = (await getStreamHistory(stream)).history;
Event? firstStreamEvent;
unawaited(streamEvents.peek.then((e) {
firstStreamEvent = e;
}));
for (final event in history) {
if (firstStreamEvent != null &&
event.timestamp! > firstStreamEvent!.timestamp!) {
break;
}
controller.sink.add(event);
}
unawaited(controller.sink.addStream(streamEvents.rest));
}, onCancel: () {
try {
streamEvents.cancel();
} on StateError {
// Underlying stream may have already been cancelled.
}
});
return controller.stream;
}