writeEncodedBlobToNodes method

Future<Map<int, StorageConfirmation?>> writeEncodedBlobToNodes({
  1. required EncodedBlob encodedBlob,
  2. required CommitteeInfo committee,
  3. required bool deletable,
  4. required String blobObjectId,
  5. Duration nodeTimeout = const Duration(seconds: 30),
  6. int maxConcurrency = 20,
})

Write encoded slivers to all storage nodes in the committee.

Returns a map from node index to confirmation (null if the node failed or did not respond in time).

Mirrors the TS SDK's writeEncodedBlobToNodes().

Implementation

Future<Map<int, StorageConfirmation?>> writeEncodedBlobToNodes({
  required EncodedBlob encodedBlob,
  required CommitteeInfo committee,
  required bool deletable,
  required String blobObjectId,
  Duration nodeTimeout = const Duration(seconds: 30),
  int maxConcurrency = 20,
}) async {
  final confirmations = <int, StorageConfirmation?>{};
  var failures = 0;

  final shardIndices = committee.nodeByShardIndex.keys.toList()..sort();

  // Process in batches to limit concurrency (like TS SDK's Promise.all
  // with bounded parallelism).
  for (var i = 0; i < shardIndices.length; i += maxConcurrency) {
    final batchEnd = (i + maxConcurrency < shardIndices.length)
        ? i + maxConcurrency
        : shardIndices.length;
    final batch = shardIndices.sublist(i, batchEnd);

    final futures = <Future<MapEntry<int, StorageConfirmation?>>>[];

    for (final shardIndex in batch) {
      final node = committee.nodeByShardIndex[shardIndex];
      if (node == null) {
        failures++;
        confirmations[shardIndex] = null;
        continue;
      }

      final pairIndex = toPairIndex(
        shardIndex,
        encodedBlob.blobIdBytes,
        committee.numShards,
      );

      if (pairIndex >= encodedBlob.primarySlivers.length ||
          pairIndex >= encodedBlob.secondarySlivers.length) {
        failures++;
        confirmations[shardIndex] = null;
        continue;
      }

      futures.add(
        _writeEncodedBlobToNode(
          node: node,
          blobId: encodedBlob.blobId,
          metadata: encodedBlob.metadataBytes,
          primarySliver: encodedBlob.primarySlivers[pairIndex],
          secondarySliver: encodedBlob.secondarySlivers[pairIndex],
          pairIndex: pairIndex,
          deletable: deletable,
          blobObjectId: blobObjectId,
          timeout: nodeTimeout,
        ).then(
          (conf) => MapEntry(shardIndex, conf as StorageConfirmation?),
          onError: (_) =>
              MapEntry<int, StorageConfirmation?>(shardIndex, null),
        ),
      );
    }

    // Await all writes in this batch concurrently.
    final results = await Future.wait(futures);
    for (final entry in results) {
      confirmations[entry.key] = entry.value;
      if (entry.value == null) failures++;
    }

    // Abort early if too many failures.
    if (isAboveValidity(failures, committee.numShards)) {
      throw NotEnoughBlobConfirmationsError(
        'Too many storage node failures ($failures). '
        'Cannot achieve quorum with ${committee.numShards} shards.',
      );
    }
  }

  return confirmations;
}