watchTable method
Implementation
Stream<DatasetTableWatchEvent> watchTable({
required String table,
List<String>? namespace,
String? branch,
double pollIntervalSeconds = 0.5,
}) async* {
final input = _DatasetArrowReadInputStream(
start: {"kind": "start", "table": table, "namespace": namespace, "branch": branch, "poll_interval_seconds": pollIntervalSeconds},
);
final output = await _invokeStream("watch_table", input.inputStream());
input.requestNext();
try {
await for (final chunk in output) {
if (chunk is ErrorContent) {
throw RoomServerException(chunk.text, code: chunk.code);
}
if (chunk is ControlContent) {
if (chunk.method == "close") {
return;
}
throw RoomServerException("unexpected return type from datasets.watch_table call");
}
if (chunk is BinaryContent && chunk.headers["kind"] == "data") {
final batch = ArrowRecordBatch(chunk.data);
yield DatasetTableWatchEvent(
kind: chunk.headers["watch_event"] ?? "data",
phase: _datasetWatchPhase(chunk.headers["phase"]),
batch: batch,
changeType: chunk.headers["change_type"],
version: _intHeader(chunk.headers["version"]),
beginVersion: _intHeader(chunk.headers["begin_version"]),
endVersion: _intHeader(chunk.headers["end_version"]),
transactions: chunk.headers["watch_event"] == "transactions" ? _transactionsFromWatchBatch(batch) : null,
);
input.requestNext();
continue;
}
if (chunk is JsonContent) {
final json = chunk.json;
final rawTransactions = json["transactions"];
final rawTransaction = json["transaction"];
yield DatasetTableWatchEvent(
kind: json["kind"]?.toString() ?? "event",
phase: _datasetWatchPhase(json["phase"]),
version: _intValue(json["version"]),
beginVersion: _intValue(json["begin_version"]),
endVersion: _intValue(json["end_version"]),
transactions: rawTransactions is List
? rawTransactions.whereType<Map>().map((value) => Map<String, Object?>.from(value)).toList(growable: false)
: null,
deletePredicate: json["predicate"]?.toString(),
transaction: rawTransaction is Map ? Map<String, Object?>.from(rawTransaction) : null,
);
input.requestNext();
continue;
}
throw RoomServerException("unexpected return type from datasets.watch_table call");
}
} finally {
input.close();
}
}