logs method

LogStream<void> logs({
  1. required String containerId,
  2. bool follow = false,
})

Implementation

LogStream<void> logs({required String containerId, bool follow = false}) {
  final requestId = const Uuid().v4();
  final closeInput = Completer<void>();
  var inputClosed = false;

  void closeInputStream() {
    if (inputClosed) {
      return;
    }
    inputClosed = true;
    if (!closeInput.isCompleted) {
      closeInput.complete();
    }
  }

  Stream<Content> inputStream() async* {
    yield BinaryContent(
      data: Uint8List(0),
      headers: {'kind': 'start', 'request_id': requestId, 'container_id': containerId, 'follow': follow},
    );
    await closeInput.future;
  }

  final completer = Completer<void>();
  final controller = StreamController<String>(
    onCancel: () async {
      closeInputStream();
      await completer.future.catchError((_) {});
    },
  );
  final progress = StreamController<LogProgress>();

  final stream = LogStream._(completer, controller.stream, progress.stream, () async {
    closeInputStream();
    await completer.future.catchError((_) {});
  });

  room
      .invoke(toolkit: 'containers', tool: 'logs', input: ToolStreamInput(inputStream()))
      .then((output) async {
        if (output is! ToolStreamOutput) {
          throw _unexpectedResponseError(operation: 'logs');
        }

        await for (final chunk in output.stream) {
          if (chunk is ErrorContent) {
            throw RoomServerException(chunk.text, code: chunk.code);
          }
          if (chunk is! BinaryContent) {
            throw _unexpectedResponseError(operation: 'logs');
          }

          final rawChannel = chunk.headers['channel'];
          if (rawChannel is! int) {
            throw RoomServerException('containers.logs returned a chunk without a valid channel');
          }
          if (rawChannel != 1) {
            continue;
          }
          controller.add(utf8.decode(chunk.data));
        }
        closeInputStream();
        unawaited(controller.close());
        unawaited(progress.close());
        completer.complete();
      })
      .catchError((Object error) async {
        closeInputStream();
        unawaited(controller.close());
        unawaited(progress.close());
        if (!completer.isCompleted) {
          completer.completeError(error);
        }
      });

  return stream;
}