getBuildLogs method
Implementation
LogStream<int?> getBuildLogs({required String buildId, bool follow = true}) {
final requestId = const Uuid().v4();
final closeInput = Completer<void>();
var inputClosed = false;
void closeInputStream() {
if (inputClosed) {
return;
}
inputClosed = true;
if (!closeInput.isCompleted) {
closeInput.complete();
}
}
Stream<Content> inputStream() async* {
yield BinaryContent(data: Uint8List(0), headers: {'kind': 'start', 'request_id': requestId, 'build_id': buildId, 'follow': follow});
await closeInput.future;
}
final completer = Completer<int?>();
final controller = StreamController<String>(
onCancel: () async {
closeInputStream();
await completer.future.catchError((_) => null);
},
);
final progress = StreamController<LogProgress>();
final stream = LogStream._(completer, controller.stream, progress.stream, () async {
closeInputStream();
await completer.future.catchError((_) => null);
});
room
.invoke(toolkit: 'containers', tool: 'get_build_logs', input: ToolStreamInput(inputStream()))
.then((output) async {
if (output is! ToolStreamOutput) {
throw _unexpectedResponseError(operation: 'get_build_logs');
}
await for (final chunk in output.stream) {
if (chunk is ErrorContent) {
throw RoomServerException(chunk.text, code: chunk.code);
}
if (chunk is ControlContent) {
if (chunk.method == 'close') {
closeInputStream();
unawaited(controller.close());
unawaited(progress.close());
if (!completer.isCompleted) {
completer.complete(null);
}
return;
}
throw _unexpectedResponseError(operation: 'get_build_logs');
}
if (chunk is! BinaryContent) {
throw _unexpectedResponseError(operation: 'get_build_logs');
}
final rawChannel = chunk.headers['channel'];
if (rawChannel is! int) {
throw RoomServerException('containers.get_build_logs returned a chunk without a valid channel');
}
if (rawChannel == 1) {
controller.add(_decodeContainerUtf8(chunk.data, operation: 'get_build_logs'));
continue;
}
if (rawChannel == 3) {
closeInputStream();
unawaited(controller.close());
unawaited(progress.close());
if (!completer.isCompleted) {
completer.complete(_decodeContainerStatusPayload(chunk.data, operation: 'get_build_logs'));
}
return;
}
}
closeInputStream();
unawaited(controller.close());
unawaited(progress.close());
if (!completer.isCompleted) {
completer.complete(null);
}
})
.catchError((Object error) async {
closeInputStream();
unawaited(controller.close());
unawaited(progress.close());
if (!completer.isCompleted) {
completer.completeError(error);
}
});
return stream;
}