onEventWithHistory method

Stream<Event> onEventWithHistory(
  1. String stream
)

Returns the stream for a given stream id which includes historical events.

If stream does not have event history collected, a parameter error is sent over the returned Stream.

Implementation

Stream<Event> onEventWithHistory(String stream) {
  late StreamController<Event> controller;
  late StreamQueue<Event> streamEvents;

  controller = StreamController<Event>(onListen: () async {
    streamEvents = StreamQueue<Event>(onEvent(stream));
    final history = (await getStreamHistory(stream)).history;
    Event? firstStreamEvent;
    unawaited(streamEvents.peek.then((e) {
      firstStreamEvent = e;
    }));
    for (final event in history) {
      if (firstStreamEvent != null &&
          event.timestamp! > firstStreamEvent!.timestamp!) {
        break;
      }
      controller.sink.add(event);
    }
    unawaited(controller.sink.addStream(streamEvents.rest));
  }, onCancel: () {
    try {
      streamEvents.cancel();
    } on StateError {
      // Underlying stream may have already been cancelled.
    }
  });

  return controller.stream;
}