TransformingFunction<I, O> constructor

TransformingFunction<I, O>()

Implementation

TransformingFunction() {
  _controller = StreamController<O>.broadcast(
    onListen: () {
      final Stream<I>? stream = _stream;
      if (stream == null) return;
      _subscription = stream.listen(
        (data) => onValue(data).forEach(_controller.add),
        onError: _controller.addError,
        onDone: () async =>
            await Future.wait([_controller.close(), dispose()]),
        cancelOnError: true,
      );
    },
    onCancel: () {
      _subscription?.cancel();
      _subscription = null;
    },
  );
}