SplitStreams<K, E> constructor

SplitStreams<K, E>(
  1. Stream<E> source,
  2. Iterable<K> keys,
  3. K classify(
    1. E
    ),
  4. bool isLastInSubstream(
    1. E
    ),
)

Implementation

SplitStreams(
  final Stream<E> source,
  final Iterable<K> keys,
  final K Function(E) classify,
  final bool Function(E) isLastInSubstream,
) {
  for (final key in keys) {
    _controllers[key] = StreamController<E>();
  }

  source.listen(
    (final event) {
      final key = classify(event);
      final ctrl = _controllers[key];
      if (ctrl == null) {
        throw StateError('Unknown key: $key');
      }
      ctrl.add(event);
      if (isLastInSubstream(event)) {
        ctrl.close();
      }
    },
    onError: (final e, final st) {
      for (final ctrl in _controllers.values) {
        if (!ctrl.isClosed) ctrl.addError(e, st);
      }
    },
    onDone: () {
      for (final ctrl in _controllers.values) {
        if (!ctrl.isClosed) ctrl.close();
      }
    },
  );
}