onEventWithHistory method
Returns the stream for a given stream id which includes historical events.
If stream
does not have event history collected, a parameter error is
sent over the returned Stream.
Implementation
Stream<Event> onEventWithHistory(String stream) {
late StreamController<Event> controller;
late StreamSubscription<Event> subscription;
controller = StreamController<Event>(onListen: () async {
bool isFirstEvent = true;
late final Event? lastHistoricalEvent;
final historyRetrievedCompleter = Completer<void>();
subscription = onEvent(stream).listen((event) async {
if (isFirstEvent) {
await historyRetrievedCompleter.future;
isFirstEvent = false;
}
// In practice, it should be impossible for two distinct events to have
// the same timestamp.
if (lastHistoricalEvent == null ||
event.timestamp! > lastHistoricalEvent.timestamp!) {
controller.sink.add(event);
}
}, onDone: () {
controller.sink.close();
}, onError: (error, stackTrace) {
controller.addError(error, stackTrace);
});
final history = (await getStreamHistory(stream)).history;
lastHistoricalEvent = history.isNotEmpty ? history.last : null;
for (final historicalEvent in history) {
controller.sink.add(historicalEvent);
}
historyRetrievedCompleter.complete();
}, onCancel: () {
try {
subscription.cancel();
} on StateError {
// Underlying stream may have already been cancelled.
}
});
return controller.stream;
}