streamFromInputStream method

  1. @override
Stream<Map<String, dynamic>> streamFromInputStream(
  1. Stream<RunInput> inputStream, {
  2. RunnableOptions? options,
})
override

Streams the output of invoking the Runnable on the given inputStream.

  • inputStream - the input stream to invoke the Runnable on.
  • options - the options to use when invoking the Runnable.

Implementation

@override
Stream<Map<String, dynamic>> streamFromInputStream(
  final Stream<RunInput> inputStream, {
  final RunnableOptions? options,
}) {
  final subject = ReplaySubject<RunInput>();
  inputStream.listen(
    subject.add,
    onError: subject.addError,
    onDone: subject.close,
  );

  return StreamGroup.merge(
    steps.entries.map((final entry) {
      return entry.value
          .streamFromInputStream(
            subject.stream,
            options: entry.value.getCompatibleOptions(options),
          )
          .map((final output) => {entry.key: output});
    }),
  ).asBroadcastStream();
}