executeStreaming method
Runs command on nodeId and forwards its output live: each stdout/stderr
chunk is delivered to onStdout/onStderr as it arrives, rather than being
buffered until the command exits. Returns the process exit code.
Use this for long-running commands where progress should appear in real time; execute is the buffered variant built on top of this.
Implementation
Future<int> executeStreaming({
required String nodeId,
required String command,
List<String> args = const [],
Map<String, String> env = const {},
String? cwd,
ShellFamily? shellFamily,
required void Function(List<int> chunk) onStdout,
required void Function(List<int> chunk) onStderr,
}) async {
final session = await openSession(
nodeId: nodeId,
mode: SessionMode.exec,
command: command,
args: args,
env: env,
cwd: cwd,
shellFamily: shellFamily,
);
// Replenish the node's send window for the bytes we consume. The channel
// grants a finite credit (256 KiB shared across stdout+stderr); without
// this, a command producing more than that drains the credit and its output
// stalls permanently. The node's credit counter is shared, so granting on
// the stdout stream covers stderr bytes too.
final outDone = session.stdout.forEach((chunk) {
if (chunk.isNotEmpty) session.grantWindow(chunk.length);
onStdout(chunk);
});
final errDone = session.stderr.forEach((chunk) {
if (chunk.isNotEmpty) session.grantWindow(chunk.length);
onStderr(chunk);
});
final code = await session.exitCode;
await Future.wait([outDone, errDone]).catchError((_) => const <void>[]);
return code;
}