pushToStream method
Forwards a data event into the broadcast controller and runs every
listener from provideOnPushToStreamListeners in arrival order via the
single per-service _pushSequencer (so emissions never interleave with
each other's listener chains).
Drops the push if the service is disposed or if the stream was restarted
after this call captured its epoch. With eagerError true, an erroring
listener short-circuits the rest of the chain for this emission.
Implementation
Resolvable<Option> pushToStream(
Result<TData> data, {
bool eagerError = false,
}) {
// Capture epoch at call-time so a push initiated against a stream that
// has since been restarted gets dropped instead of landing in the new
// controller / completer.
final epochAtCall = _streamEpoch;
return _pushSequencer.then((prev1) {
assert(!state.didDispose());
if (state.didDispose() || epochAtCall != _streamEpoch) {
return Sync.result(prev1);
}
_pushSequencer.then((_) {
return Resolvable(() {
if (epochAtCall != _streamEpoch) return const None();
if (_streamController case Some(value: final ctrl)
when !ctrl.isClosed) {
ctrl.add(data);
}
return _initDataCompleter.map(
(e) => e.resolve(Sync.result(data)).value,
);
});
}).end();
for (final listener in provideOnPushToStreamListeners()) {
_pushSequencer.then((prev2) {
if (epochAtCall != _streamEpoch) {
return Sync.result(prev2);
}
if (prev2 case Err(:final error)) {
Log.err(
'$runtimeType.pushToStream: listener chain error: $error',
);
if (eagerError) {
return Sync.result(prev2);
}
}
return listener(data).then((e) => prev2).flatten2();
}).end();
}
return Sync.result(prev1);
});
}