stream<T> method

Stream<T> stream<T>(
  1. int command, {
  2. List args = const [],
  3. CancellationToken? token,
  4. bool inspectRequest = false,
  5. bool inspectResponse = false,
})

Sends a streaming workload to the worker.

Implementation

Stream<T> stream<T>(int command,
    {List args = const [],
    CancellationToken? token,
    bool inspectRequest = false,
    bool inspectResponse = false}) {
  // update stats
  _workload++;
  if (_workload > _maxWorkload) {
    _maxWorkload = _workload;
  }

  bool done = false;
  void onDone() {
    if (!done) {
      done = true;
      _workload--;
      _totalWorkload++;
      _idle = DateTime.now().microsecondsSinceEpoch;
    }
  }

  if (_channel != null) {
    // worker is up and running: return the stream directly
    return _channel!.sendStreamingRequest<T>(
      command,
      args,
      onDone: onDone,
      token: token,
      inspectRequest: inspectRequest,
      inspectResponse: inspectResponse,
    );
  }

  // worker has not started yet: start it and forward the stream via a StreamController
  late final StreamController<T> controller;
  controller = StreamController<T>(onListen: () async {
    try {
      final channel = await start();
      await controller.addStream(channel.sendStreamingRequest<T>(
        command,
        args,
        onDone: onDone,
        token: token,
        inspectRequest: inspectRequest,
        inspectResponse: inspectResponse,
      ));
    } catch (ex, st) {
      controller.addError(SquadronException.from(ex, st), st);
    } finally {
      controller.close();
      onDone();
    }
  });
  return controller.stream;
}