open method

Future<MeshDocument> open(
  1. String path, {
  2. bool create = true,
  3. Map<String, dynamic>? initialJson,
  4. MeshSchema? schema,
})

Implementation

Future<MeshDocument> open(String path, {bool create = true, Map<String, dynamic>? initialJson, MeshSchema? schema}) async {
  final normalizedPath = _normalizeSyncPath(path);
  final closing = _closingDocuments[normalizedPath];
  if (closing != null) {
    await closing;
  }
  final pending = _connectingDocuments[normalizedPath];

  if (pending != null) {
    await pending;
  }

  if (_connectedDocuments[normalizedPath] != null) {
    final connectedDoc = _connectedDocuments[normalizedPath];
    connectedDoc!.count++;
    return connectedDoc.ref;
  }

  // todo: add support for state vector / partial updates
  // todo: initial bytes loading

  final c = Completer<_RefCount<MeshDocument>>();
  _connectingDocuments[normalizedPath] = c.future;
  _SyncOpenStreamState? streamState;
  StreamIterator<Content>? iterator;
  try {
    streamState = _SyncOpenStreamState(
      path: normalizedPath,
      create: create,
      vector: null,
      schemaJson: schema?.toJson(),
      schemaPath: null,
      initialJson: initialJson,
    );
    final output = await _invoke("open", ToolStreamInput(streamState.inputStream()));
    if (output is! ToolStreamOutput) {
      throw _unexpectedResponseError(operation: "open");
    }

    iterator = StreamIterator(output.stream);
    if (!await iterator.moveNext()) {
      throw RoomServerException("sync.open stream closed before the initial document state was returned");
    }
    final firstChunk = iterator.current;
    if (firstChunk is ErrorContent) {
      throw RoomServerException(firstChunk.text, code: firstChunk.code);
    }
    if (firstChunk is! BinaryContent) {
      throw _unexpectedResponseError(operation: "open");
    }

    final stateHeaders = _SyncOpenStateChunkHeaders.fromHeaders(firstChunk.headers);
    if (_normalizeSyncPath(stateHeaders.path) != normalizedPath) {
      throw RoomServerException("sync.open stream returned a mismatched path");
    }
    schema = MeshSchema.fromJson(stateHeaders.schemaJson);

    final doc = MeshDocument(
      schema: schema,
      sendChangesToBackend: (base64) {
        try {
          streamState!.queueSync(Uint8List.fromList(utf8.encode(base64)));
        } catch (_) {}
      },
    );
    final rc = _RefCount(doc);
    _connectedDocuments[normalizedPath] = rc;
    _documentStreams[normalizedPath] = streamState;
    _applySyncPayload(rc, firstChunk.data);
    streamState.attachTask(_consumeOpenStream(path: normalizedPath, doc: rc, iterator: iterator, streamState: streamState));
    notifyListeners();

    c.complete(rc);
    await doc.synchronized;
    return doc;
  } catch (err) {
    streamState?.closeInputStream();
    if (iterator != null) {
      await iterator.cancel();
    }
    c.completeError(err);
    rethrow;
  } finally {
    _connectingDocuments.remove(normalizedPath);
  }
}