CompiledStreamGraph constructor

CompiledStreamGraph(
  1. DirectedGraph<GraphNode> graph,
  2. Map<SourceNode, Stream> binding, {
  3. StreamTransformer<T, T> transformStream<T>(
    1. StreamNode<T> node
    )?,
  4. void doOnData(
    1. dynamic o,
    2. StreamNode
    )?,
})

Implementation

CompiledStreamGraph(this.graph, Map<SourceNode, Stream> binding,
    {this.transformStream, this.doOnData}) {
  nodesByName = {
    for (var e
        in this.graph.vertices.where((element) => element.name != null))
      e.name!: e
  };
  final givenStreams = {...binding};
  final sourceNodes =
      graph.data.keys.whereType<SourceNode>().toList(growable: false);
  startStreams = Map.fromEntries(sourceNodes
      .map((node) => MapEntry(node, node.transformStreams(givenStreams))));
  startStreams.forEach((key, value) {
    _addStreamForNode(value.key.stream, key);
  });
  final sortedNodes = graph.sortedTopologicalOrdering!
      .whereType<StreamNode>()
      .toList(growable: false);
  sortedNodes.forEach((node) {
    Stream? newStream;
    if (node is SourceNode) {
      newStream = startStreams[node]!.key.stream;
    } else if (node is TransformNode) {
      newStream = node.transformStreams(streams);
    } else if (node is ScheduleNode) {
      newStream = node.transformStreams(streams);
    } else if (node is LifecycleScheduleNode) {
      newStream = node.transformStreams(streams);
    } else if (node is FilterNode) {
      newStream = node.transformStreams(streams);
    } else if (node is CombineAllNode) {
      newStream = node.transformStreams(streams);
    } else if (node is Combine2Node) {
      newStream = node.transformStreams(streams);
    } else if (node is CopyNode) {
      newStream = node.stream;
    } else if (node is ConversionNode || node is Partitioning) {
    } else {
      throw UnimplementedError('$node');
    }
    if (newStream != null) {
      _addStreamForNode(newStream, node);
    }
  });

  final mappings = graph
      .whereType<CopyNode>()
      .map((node) => MapEntry<StreamNode,
              MapEntry<StreamController, StreamSubscription>>(
          node, node.attach(streams)))
      .toList(growable: false);
  final copyStreams = Map<StreamNode,
      MapEntry<StreamController, StreamSubscription>>.fromEntries(mappings);
  copyStreams.forEach((key, value) {
    _addStreamForNode(value.key.stream, key);
  });
  startStreams.addAll(copyStreams);
}