writeEncodedBlobToNodes method
Future<Map<int, StorageConfirmation?> >
writeEncodedBlobToNodes({
- required EncodedBlob encodedBlob,
- required CommitteeInfo committee,
- required bool deletable,
- required String blobObjectId,
- Duration nodeTimeout = const Duration(seconds: 30),
- int maxConcurrency = 20,
Write encoded slivers to all storage nodes in the committee.
Returns a map from node index to confirmation (null if the node failed or did not respond in time).
Mirrors the TS SDK's writeEncodedBlobToNodes().
Implementation
Future<Map<int, StorageConfirmation?>> writeEncodedBlobToNodes({
required EncodedBlob encodedBlob,
required CommitteeInfo committee,
required bool deletable,
required String blobObjectId,
Duration nodeTimeout = const Duration(seconds: 30),
int maxConcurrency = 20,
}) async {
final confirmations = <int, StorageConfirmation?>{};
var failures = 0;
final shardIndices = committee.nodeByShardIndex.keys.toList()..sort();
// Process in batches to limit concurrency (like TS SDK's Promise.all
// with bounded parallelism).
for (var i = 0; i < shardIndices.length; i += maxConcurrency) {
final batchEnd = (i + maxConcurrency < shardIndices.length)
? i + maxConcurrency
: shardIndices.length;
final batch = shardIndices.sublist(i, batchEnd);
final futures = <Future<MapEntry<int, StorageConfirmation?>>>[];
for (final shardIndex in batch) {
final node = committee.nodeByShardIndex[shardIndex];
if (node == null) {
failures++;
confirmations[shardIndex] = null;
continue;
}
final pairIndex = toPairIndex(
shardIndex,
encodedBlob.blobIdBytes,
committee.numShards,
);
if (pairIndex >= encodedBlob.primarySlivers.length ||
pairIndex >= encodedBlob.secondarySlivers.length) {
failures++;
confirmations[shardIndex] = null;
continue;
}
futures.add(
_writeEncodedBlobToNode(
node: node,
blobId: encodedBlob.blobId,
metadata: encodedBlob.metadataBytes,
primarySliver: encodedBlob.primarySlivers[pairIndex],
secondarySliver: encodedBlob.secondarySlivers[pairIndex],
pairIndex: pairIndex,
deletable: deletable,
blobObjectId: blobObjectId,
timeout: nodeTimeout,
).then(
(conf) => MapEntry(shardIndex, conf as StorageConfirmation?),
onError: (_) =>
MapEntry<int, StorageConfirmation?>(shardIndex, null),
),
);
}
// Await all writes in this batch concurrently.
final results = await Future.wait(futures);
for (final entry in results) {
confirmations[entry.key] = entry.value;
if (entry.value == null) failures++;
}
// Abort early if too many failures.
if (isAboveValidity(failures, committee.numShards)) {
throw NotEnoughBlobConfirmationsError(
'Too many storage node failures ($failures). '
'Cannot achieve quorum with ${committee.numShards} shards.',
);
}
}
return confirmations;
}