open method
Future<MeshDocument>
open(
- String path, {
- bool create = true,
- Map<
String, dynamic> ? initialJson, - MeshSchema? schema,
Implementation
Future<MeshDocument> open(String path, {bool create = true, Map<String, dynamic>? initialJson, MeshSchema? schema}) async {
final normalizedPath = _normalizeSyncPath(path);
final closing = _closingDocuments[normalizedPath];
if (closing != null) {
await closing;
}
final pending = _connectingDocuments[normalizedPath];
if (pending != null) {
await pending;
}
if (_connectedDocuments[normalizedPath] != null) {
final connectedDoc = _connectedDocuments[normalizedPath];
connectedDoc!.count++;
return connectedDoc.ref;
}
// todo: add support for state vector / partial updates
// todo: initial bytes loading
final c = Completer<_RefCount<MeshDocument>>();
_connectingDocuments[normalizedPath] = c.future;
_SyncOpenStreamState? streamState;
StreamIterator<Content>? iterator;
try {
streamState = _SyncOpenStreamState(
path: normalizedPath,
create: create,
vector: null,
schemaJson: schema?.toJson(),
schemaPath: null,
initialJson: initialJson,
);
final output = await _invoke("open", ToolStreamInput(streamState.inputStream()));
if (output is! ToolStreamOutput) {
throw _unexpectedResponseError(operation: "open");
}
iterator = StreamIterator(output.stream);
if (!await iterator.moveNext()) {
throw RoomServerException("sync.open stream closed before the initial document state was returned");
}
final firstChunk = iterator.current;
if (firstChunk is ErrorContent) {
throw RoomServerException(firstChunk.text, code: firstChunk.code);
}
if (firstChunk is! BinaryContent) {
throw _unexpectedResponseError(operation: "open");
}
final stateHeaders = _SyncOpenStateChunkHeaders.fromHeaders(firstChunk.headers);
if (_normalizeSyncPath(stateHeaders.path) != normalizedPath) {
throw RoomServerException("sync.open stream returned a mismatched path");
}
schema = MeshSchema.fromJson(stateHeaders.schemaJson);
final doc = MeshDocument(
schema: schema,
sendChangesToBackend: (base64) {
try {
streamState!.queueSync(Uint8List.fromList(utf8.encode(base64)));
} catch (_) {}
},
);
final rc = _RefCount(doc);
_connectedDocuments[normalizedPath] = rc;
_documentStreams[normalizedPath] = streamState;
_applySyncPayload(rc, firstChunk.data);
streamState.attachTask(_consumeOpenStream(path: normalizedPath, doc: rc, iterator: iterator, streamState: streamState));
notifyListeners();
c.complete(rc);
await doc.synchronized;
return doc;
} catch (err) {
streamState?.closeInputStream();
if (iterator != null) {
await iterator.cancel();
}
c.completeError(err);
rethrow;
} finally {
_connectingDocuments.remove(normalizedPath);
}
}