blossom_upload_queue_shim_for_ndk 0.2.0
blossom_upload_queue_shim_for_ndk: ^0.2.0 copied to clipboard
Offline-first wrapper around the ndk package's Blossom upload use case. Persists per-upload metadata to a sembast queue, reads blob bytes from a blossom_cache, and retries until every targeted server [...]
blossom_upload_queue_shim_for_ndk #
Offline-first wrapper around the ndk package's
Blossom upload use case.
NDK's blossom.uploadBlob ships a blob to a set of Blossom servers and reports
per-server results. If every server is unreachable (flaky network, app
backgrounded, process killed mid-upload), the blob is gone from the caller's
perspective. This shim sits in front of ndk.blossom.uploadBlob and adds:
- Local persistence first. The queue entry is committed to a sembast store
and the bytes are pinned in a
blossom_cachebefore any network attempt.upload()returns once persistence is durable; delivery happens in the background and survives restarts. - 100 % delivery guarantee. An entry is only marked
deliveredonce every targeted server has returnedsuccess: true. Partial success keeps the entry pending and retries the missing servers. - Monotonic ack history. A server that has acked never un-acks. A delivered entry never silently flips back to pending due to a transient server outage.
- No auto-deletion. Delivered entries stay in the store and can be re-uploaded later, for instance to a freshly discovered server.
Install #
dependencies:
blossom_upload_queue_shim_for_ndk: ^0.1.0
blossom_cache: ^0.1.0
ndk: ^0.8.3
sembast: ^3.8.7
idb_shim: ^2.9.2
Quick start #
import 'dart:typed_data';
import 'package:blossom_cache/blossom_cache.dart';
import 'package:blossom_upload_queue_shim_for_ndk/blossom_upload_queue_shim_for_ndk.dart';
import 'package:idb_shim/idb_io.dart';
import 'package:ndk/ndk.dart';
import 'package:sembast/sembast_io.dart';
Future<void> main() async {
final db = await databaseFactoryIo.openDatabase('blossom_uploads.db');
final cache = await IdbBlossomCache.open(factory: idbFactorySembastIo);
final ndk = Ndk(
NdkConfig(eventVerifier: Bip340EventVerifier(), cache: MemCacheManager()),
);
final outbox = OfflineBlossomUpload.withNdk(ndk, cache: cache, db: db);
outbox.start();
// Bytes go in the cache first, with a caller-supplied sha256.
await cache.put(mySha256, myBytes, type: 'image/png');
// Returns as soon as the queue entry is persisted. Delivery is now the
// shim's responsibility.
await outbox.upload(
sha256: mySha256,
servers: const ['https://blossom.primal.net', 'https://cdn.satellite.earth'],
);
}
Semantics #
upload(sha256:, servers:, contentType:) #
Persists a queue entry for the blob identified by sha256 and schedules an
immediate attempt to push it to every URL in servers. The blob bytes must
already live in the BlossomCache passed to the constructor. upload()
looks them up via cache.head(sha256) and throws StateError if absent. The
servers list is required; no automatic server-list lookup is performed.
URLs are normalized (lowercased, trailing / stripped) before storage.
If a record with the same sha256 already exists, the server lists are
merged. deliveredAt is preserved if every server in the merged list is
already in the entry's ack set; otherwise the entry is demoted to pending so
the missing servers get pushed.
If no contentType is provided, the shim reads it from the cache descriptor.
retryNow() #
Forces an immediate scan of due entries, bypassing the online check. Use it as an explicit override (e.g. when the user pulls to refresh).
Connectivity awareness #
OfflineBlossomUpload.withNdk() subscribes to
ndk.connectivity.relayConnectivityChanges and pauses the periodic retry
timer while no public relay is connected. Blossom has no dedicated
connectivity stream, so the shim treats "a relay is reachable" as a proxy for
"the device has internet." Loopback addresses, RFC1918 IPv4, ULA/link-local
IPv6, and mDNS .local names are excluded from the "is online" computation
so a local dev relay cannot mask a real outage.
For non-NDK setups, pass any Stream<bool> onlineSignal to the default
constructor:
OfflineBlossomUpload(
uploadFn: ...,
cache: cache,
db: db,
onlineSignal: yourConnectivityStream, // true while online, false otherwise
);
If you don't pass anything, the shim assumes it is always online and the periodic timer runs unconditionally.
reupload(sha256, {String? server}) #
ackedServers and deliveredAt are monotonic. reupload never rewrites the
past; it queues a one-shot push via a transient forcedServers override that
the next attempt consumes.
reupload(sha256): schedules an immediate push to every server in the entry'sserverslist, including those that already acked. Useful when you suspect a server dropped your blob. Acks anddeliveredAtare preserved regardless of the new attempt's outcome.reupload(sha256, server: s): pushes to that single server. Ifsis new to the entry, it joins the target list and the entry is demoted to pending untilsacks. Ifswas already there, the historical state is preserved.
Pin ownership #
The shim pins the blob in the BlossomCache when it takes ownership of an
upload, and releases the pin on delivered. It only releases pins it
applied itself. If you pinned the blob before calling upload() (e.g.
because the same blob is also referenced by something else in your app), the
shim records pinnedByShim: false on the queue entry and leaves your pin
alone, both during and after delivery.
A blob whose bytes are deleted from the cache while still pending will have
its next attempt fail with lastErrors[server] = 'blob bytes missing from cache'. The entry stays in the store; the network call is not made.
What "success" means #
The full target set must ack. The shim uses UploadStrategy.allSimultaneous
internally so each server is attempted independently; per-server failures do
not affect the others. firstSuccess and mirrorAfterSuccess are
deliberately not exposed: they would either fail to cover every target or
introduce server-to-server dependencies that complicate the model.
What the shim does NOT do #
- It never signs. Whatever bytes you pass are forwarded as-is to
ndk.blossom.uploadBlob, which signs the BUD-01 authorization event using theEventSignerconfigured on NDK. The shim has no opinion on signing. - It never hashes. The caller provides the sha256. The cache stores by that key. The shim queues by that key.
- It never requests server-side media optimisation.
serverMediaOptimisationwould re-encode the blob server-side and change the resulting sha256, breaking content-addressing. Hardcodedfalse. - It never deletes records. Even after full delivery, the entry stays in the database. If you want retention, prune sembast directly.
- It does not give up. Without a
maxAttemptsknob, a deterministically rejected upload (size too big for the server, blob type not allowed, etc.) will retry forever with exponential backoff. InspectQueuedBlobUpload.lastErrorsand remove manually if needed.
Tuning #
OfflineBlossomUpload.withNdk(
ndk,
cache: cache,
db: db,
storeName: 'blob_uploads', // sembast store name
tickInterval: const Duration(seconds: 30), // periodic retry scan
initialBackoff: const Duration(seconds: 5), // backoff floor
maxBackoff: const Duration(minutes: 30), // backoff ceiling
perAttemptTimeout: const Duration(minutes: 5), // give up on a single NDK call after this
);
Testing your integration #
OfflineBlossomUpload is fully unit-testable without NDK. Pass a custom
BlobUploadFn to the default constructor:
final outbox = OfflineBlossomUpload(
uploadFn:
({
required data,
required serverUrls,
required precomputedSha256,
contentType,
}) async => [
for (final s in serverUrls)
BlobUploadResult(serverUrl: s, success: true),
],
cache: await IdbBlossomCache.open(factory: newIdbFactoryMemory()),
db: await newDatabaseFactoryMemory().openDatabase('test.db'),
);
The package's own test suite uses exactly this approach; see
test/offline_blossom_upload_test.dart.
License #
MIT