SplitStreams<K, E> constructor
SplitStreams<K, E> ()
Implementation
SplitStreams(
final Stream<E> source,
final Iterable<K> keys,
final K Function(E) classify,
final bool Function(E) isLastInSubstream,
) {
for (final key in keys) {
_controllers[key] = StreamController<E>();
}
source.listen(
(final event) {
final key = classify(event);
final ctrl = _controllers[key];
if (ctrl == null) {
throw StateError('Unknown key: $key');
}
ctrl.add(event);
if (isLastInSubstream(event)) {
ctrl.close();
}
},
onError: (final e, final st) {
for (final ctrl in _controllers.values) {
if (!ctrl.isClosed) ctrl.addError(e, st);
}
},
onDone: () {
for (final ctrl in _controllers.values) {
if (!ctrl.isClosed) ctrl.close();
}
},
);
}