stream<T> method
Stream<T>
stream<T>(
- int command, {
- List args = const [],
- CancellationToken? token,
- bool inspectRequest = false,
- bool inspectResponse = false,
Sends a streaming workload to the worker.
Implementation
Stream<T> stream<T>(int command,
{List args = const [],
CancellationToken? token,
bool inspectRequest = false,
bool inspectResponse = false}) {
// update stats
_workload++;
if (_workload > _maxWorkload) {
_maxWorkload = _workload;
}
bool done = false;
void onDone() {
if (!done) {
done = true;
_workload--;
_totalWorkload++;
_idle = DateTime.now().microsecondsSinceEpoch;
}
}
if (_channel != null) {
// worker is up and running: return the stream directly
return _channel!.sendStreamingRequest<T>(
command,
args,
onDone: onDone,
token: token,
inspectRequest: inspectRequest,
inspectResponse: inspectResponse,
);
}
// worker has not started yet: start it and forward the stream via a StreamController
late final StreamController<T> controller;
controller = StreamController<T>(onListen: () async {
try {
final channel = await start();
await controller.addStream(channel.sendStreamingRequest<T>(
command,
args,
onDone: onDone,
token: token,
inspectRequest: inspectRequest,
inspectResponse: inspectResponse,
));
} catch (ex, st) {
controller.addError(SquadronException.from(ex, st), st);
} finally {
controller.close();
onDone();
}
});
return controller.stream;
}