pushToStream method

Resolvable<Option<Object>> pushToStream(
  1. Result<TData> data, {
  2. bool eagerError = false,
})

Forwards a data event into the broadcast controller and runs every listener from provideOnPushToStreamListeners in arrival order via the single per-service _pushSequencer (so emissions never interleave with each other's listener chains).

Drops the push if the service is disposed or if the stream was restarted after this call captured its epoch. With eagerError true, an erroring listener short-circuits the rest of the chain for this emission.

Implementation

Resolvable<Option> pushToStream(
  Result<TData> data, {
  bool eagerError = false,
}) {
  // Capture epoch at call-time so a push initiated against a stream that
  // has since been restarted gets dropped instead of landing in the new
  // controller / completer.
  final epochAtCall = _streamEpoch;
  return _pushSequencer.then((prev1) {
    assert(!state.didDispose());
    if (state.didDispose() || epochAtCall != _streamEpoch) {
      return Sync.result(prev1);
    }
    _pushSequencer.then((_) {
      return Resolvable(() {
        if (epochAtCall != _streamEpoch) return const None();
        if (_streamController case Some(value: final ctrl)
            when !ctrl.isClosed) {
          ctrl.add(data);
        }
        return _initDataCompleter.map(
          (e) => e.resolve(Sync.result(data)).value,
        );
      });
    }).end();
    for (final listener in provideOnPushToStreamListeners()) {
      _pushSequencer.then((prev2) {
        if (epochAtCall != _streamEpoch) {
          return Sync.result(prev2);
        }
        if (prev2 case Err(:final error)) {
          Log.err(
            '$runtimeType.pushToStream: listener chain error: $error',
          );
          if (eagerError) {
            return Sync.result(prev2);
          }
        }
        return listener(data).then((e) => prev2).flatten2();
      }).end();
    }
    return Sync.result(prev1);
  });
}