logs method
Implementation
LogStream<void> logs({required String containerId, bool follow = false}) {
final requestId = const Uuid().v4();
final closeInput = Completer<void>();
var inputClosed = false;
void closeInputStream() {
if (inputClosed) {
return;
}
inputClosed = true;
if (!closeInput.isCompleted) {
closeInput.complete();
}
}
Stream<Content> inputStream() async* {
yield BinaryContent(
data: Uint8List(0),
headers: {'kind': 'start', 'request_id': requestId, 'container_id': containerId, 'follow': follow},
);
await closeInput.future;
}
final completer = Completer<void>();
final controller = StreamController<String>(
onCancel: () async {
closeInputStream();
await completer.future.catchError((_) {});
},
);
final progress = StreamController<LogProgress>();
final stream = LogStream._(completer, controller.stream, progress.stream, () async {
closeInputStream();
await completer.future.catchError((_) {});
});
room
.invoke(toolkit: 'containers', tool: 'logs', input: ToolStreamInput(inputStream()))
.then((output) async {
if (output is! ToolStreamOutput) {
throw _unexpectedResponseError(operation: 'logs');
}
await for (final chunk in output.stream) {
if (chunk is ErrorContent) {
throw RoomServerException(chunk.text, code: chunk.code);
}
if (chunk is! BinaryContent) {
throw _unexpectedResponseError(operation: 'logs');
}
final rawChannel = chunk.headers['channel'];
if (rawChannel is! int) {
throw RoomServerException('containers.logs returned a chunk without a valid channel');
}
if (rawChannel != 1) {
continue;
}
controller.add(utf8.decode(chunk.data));
}
closeInputStream();
unawaited(controller.close());
unawaited(progress.close());
completer.complete();
})
.catchError((Object error) async {
closeInputStream();
unawaited(controller.close());
unawaited(progress.close());
if (!completer.isCompleted) {
completer.completeError(error);
}
});
return stream;
}