onEventWithHistory method

Stream<Event> onEventWithHistory(
  1. String stream
)

Returns the stream for a given stream id which includes historical events.

If stream does not have event history collected, a parameter error is sent over the returned Stream.

Implementation

Stream<Event> onEventWithHistory(String stream) {
  late StreamController<Event> controller;
  late StreamSubscription<Event> subscription;

  controller = StreamController<Event>(onListen: () async {
    bool isFirstEvent = true;
    late final Event? lastHistoricalEvent;
    final historyRetrievedCompleter = Completer<void>();

    subscription = onEvent(stream).listen((event) async {
      if (isFirstEvent) {
        await historyRetrievedCompleter.future;
        isFirstEvent = false;
      }
      // In practice, it should be impossible for two distinct events to have
      // the same timestamp.
      if (lastHistoricalEvent == null ||
          event.timestamp! > lastHistoricalEvent.timestamp!) {
        controller.sink.add(event);
      }
    }, onDone: () {
      controller.sink.close();
    }, onError: (error, stackTrace) {
      controller.addError(error, stackTrace);
    });

    final history = (await getStreamHistory(stream)).history;
    lastHistoricalEvent = history.isNotEmpty ? history.last : null;
    for (final historicalEvent in history) {
      controller.sink.add(historicalEvent);
    }
    historyRetrievedCompleter.complete();
  }, onCancel: () {
    try {
      subscription.cancel();
    } on StateError {
      // Underlying stream may have already been cancelled.
    }
  });

  return controller.stream;
}