CompiledStreamGraph constructor
CompiledStreamGraph(
- DirectedGraph<
GraphNode> graph, - Map<
SourceNode, Stream> binding, { - StreamTransformer<
T, T> transformStream<T>( - StreamNode<
T> node
- StreamNode<
- void doOnData(
- dynamic o,
- StreamNode
Implementation
CompiledStreamGraph(this.graph, Map<SourceNode, Stream> binding,
{this.transformStream, this.doOnData}) {
nodesByName = {
for (var e
in this.graph.vertices.where((element) => element.name != null))
e.name!: e
};
final givenStreams = {...binding};
final sourceNodes =
graph.data.keys.whereType<SourceNode>().toList(growable: false);
startStreams = Map.fromEntries(sourceNodes
.map((node) => MapEntry(node, node.transformStreams(givenStreams))));
startStreams.forEach((key, value) {
_addStreamForNode(value.key.stream, key);
});
final sortedNodes = graph.sortedTopologicalOrdering!
.whereType<StreamNode>()
.toList(growable: false);
sortedNodes.forEach((node) {
Stream? newStream;
if (node is SourceNode) {
newStream = startStreams[node]!.key.stream;
} else if (node is TransformNode) {
newStream = node.transformStreams(streams);
} else if (node is ScheduleNode) {
newStream = node.transformStreams(streams);
} else if (node is LifecycleScheduleNode) {
newStream = node.transformStreams(streams);
} else if (node is FilterNode) {
newStream = node.transformStreams(streams);
} else if (node is CombineAllNode) {
newStream = node.transformStreams(streams);
} else if (node is Combine2Node) {
newStream = node.transformStreams(streams);
} else if (node is CopyNode) {
newStream = node.stream;
} else if (node is ConversionNode || node is Partitioning) {
} else {
throw UnimplementedError('$node');
}
if (newStream != null) {
_addStreamForNode(newStream, node);
}
});
final mappings = graph
.whereType<CopyNode>()
.map((node) => MapEntry<StreamNode,
MapEntry<StreamController, StreamSubscription>>(
node, node.attach(streams)))
.toList(growable: false);
final copyStreams = Map<StreamNode,
MapEntry<StreamController, StreamSubscription>>.fromEntries(mappings);
copyStreams.forEach((key, value) {
_addStreamForNode(value.key.stream, key);
});
startStreams.addAll(copyStreams);
}