run method

  1. @override
Future run(
  1. Map<String, dynamic> shared
)
override

Executes the streaming batch flow.

This method expects a list of items to be provided in the flow's parameters under the key "items". It then populates the shared state with this list and executes the flow of nodes.

Returns the result of the last node in the flow, which is expected to be the final processed batch of items.

Implementation

@override
/// Executes the streaming batch flow.
///
/// This method expects a list of items to be provided in the flow's
/// parameters under the key "items". It then populates the shared state with
/// this list and executes the flow of nodes.
///
/// Returns the result of the last node in the flow, which is expected to be
/// the final processed batch of items.
Future<dynamic> run(Map<String, dynamic> shared) async {
  // Before the flow starts, the initial batch of items is expected to be in
  // the 'items' parameter of the flow itself.
  if (!params.containsKey('items') || params['items'] is! List) {
    throw ArgumentError(
      'StreamingBatchFlow requires a list of items under the key "items" in '
      'its params.',
    );
  }

  // The initial `shared` storage is populated with the batch of items.
  // Each subsequent node in the flow is responsible for reading from and
  // writing to the 'items' key in the `shared` map.
  final initialShared = Map<String, dynamic>.from(shared);
  initialShared['items'] = List<TIn>.from(params['items'] as List);

  // Remove 'items' from flow params so it doesn't override shared['items']
  // in subsequent nodes. Each node should read from shared['items'] which
  // gets updated by the previous node.
  final originalItems = params.remove('items');

  try {
    // The `super.run()` method executes the flow of nodes. Each node will
    // read and write to the `initialShared` map.
    return await super.run(initialShared);
  } finally {
    // Restore the items param
    params['items'] = originalItems;
  }
}