watchTable method

Stream<DatasetTableWatchEvent> watchTable({
  1. required String table,
  2. List<String>? namespace,
  3. String? branch,
  4. double pollIntervalSeconds = 0.5,
})

Implementation

Stream<DatasetTableWatchEvent> watchTable({
  required String table,
  List<String>? namespace,
  String? branch,
  double pollIntervalSeconds = 0.5,
}) async* {
  final input = _DatasetArrowReadInputStream(
    start: {"kind": "start", "table": table, "namespace": namespace, "branch": branch, "poll_interval_seconds": pollIntervalSeconds},
  );
  final output = await _invokeStream("watch_table", input.inputStream());
  input.requestNext();
  try {
    await for (final chunk in output) {
      if (chunk is ErrorContent) {
        throw RoomServerException(chunk.text, code: chunk.code);
      }
      if (chunk is ControlContent) {
        if (chunk.method == "close") {
          return;
        }
        throw RoomServerException("unexpected return type from datasets.watch_table call");
      }
      if (chunk is BinaryContent && chunk.headers["kind"] == "data") {
        final batch = ArrowRecordBatch(chunk.data);
        yield DatasetTableWatchEvent(
          kind: chunk.headers["watch_event"] ?? "data",
          phase: _datasetWatchPhase(chunk.headers["phase"]),
          batch: batch,
          changeType: chunk.headers["change_type"],
          version: _intHeader(chunk.headers["version"]),
          beginVersion: _intHeader(chunk.headers["begin_version"]),
          endVersion: _intHeader(chunk.headers["end_version"]),
          transactions: chunk.headers["watch_event"] == "transactions" ? _transactionsFromWatchBatch(batch) : null,
        );
        input.requestNext();
        continue;
      }
      if (chunk is JsonContent) {
        final json = chunk.json;
        final rawTransactions = json["transactions"];
        final rawTransaction = json["transaction"];
        yield DatasetTableWatchEvent(
          kind: json["kind"]?.toString() ?? "event",
          phase: _datasetWatchPhase(json["phase"]),
          version: _intValue(json["version"]),
          beginVersion: _intValue(json["begin_version"]),
          endVersion: _intValue(json["end_version"]),
          transactions: rawTransactions is List
              ? rawTransactions.whereType<Map>().map((value) => Map<String, Object?>.from(value)).toList(growable: false)
              : null,
          deletePredicate: json["predicate"]?.toString(),
          transaction: rawTransaction is Map ? Map<String, Object?>.from(rawTransaction) : null,
        );
        input.requestNext();
        continue;
      }
      throw RoomServerException("unexpected return type from datasets.watch_table call");
    }
  } finally {
    input.close();
  }
}