blossom_upload_queue_shim_for_ndk 0.2.0 copy "blossom_upload_queue_shim_for_ndk: ^0.2.0" to clipboard
blossom_upload_queue_shim_for_ndk: ^0.2.0 copied to clipboard

Offline-first wrapper around the ndk package's Blossom upload use case. Persists per-upload metadata to a sembast queue, reads blob bytes from a blossom_cache, and retries until every targeted server [...]

blossom_upload_queue_shim_for_ndk #

Offline-first wrapper around the ndk package's Blossom upload use case.

NDK's blossom.uploadBlob ships a blob to a set of Blossom servers and reports per-server results. If every server is unreachable (flaky network, app backgrounded, process killed mid-upload), the blob is gone from the caller's perspective. This shim sits in front of ndk.blossom.uploadBlob and adds:

  • Local persistence first. The queue entry is committed to a sembast store and the bytes are pinned in a blossom_cache before any network attempt. upload() returns once persistence is durable; delivery happens in the background and survives restarts.
  • 100 % delivery guarantee. An entry is only marked delivered once every targeted server has returned success: true. Partial success keeps the entry pending and retries the missing servers.
  • Monotonic ack history. A server that has acked never un-acks. A delivered entry never silently flips back to pending due to a transient server outage.
  • No auto-deletion. Delivered entries stay in the store and can be re-uploaded later, for instance to a freshly discovered server.

Install #

dependencies:
  blossom_upload_queue_shim_for_ndk: ^0.1.0
  blossom_cache: ^0.1.0
  ndk: ^0.8.3
  sembast: ^3.8.7
  idb_shim: ^2.9.2

Quick start #

import 'dart:typed_data';

import 'package:blossom_cache/blossom_cache.dart';
import 'package:blossom_upload_queue_shim_for_ndk/blossom_upload_queue_shim_for_ndk.dart';
import 'package:idb_shim/idb_io.dart';
import 'package:ndk/ndk.dart';
import 'package:sembast/sembast_io.dart';

Future<void> main() async {
  final db = await databaseFactoryIo.openDatabase('blossom_uploads.db');
  final cache = await IdbBlossomCache.open(factory: idbFactorySembastIo);

  final ndk = Ndk(
    NdkConfig(eventVerifier: Bip340EventVerifier(), cache: MemCacheManager()),
  );

  final outbox = OfflineBlossomUpload.withNdk(ndk, cache: cache, db: db);
  outbox.start();

  // Bytes go in the cache first, with a caller-supplied sha256.
  await cache.put(mySha256, myBytes, type: 'image/png');

  // Returns as soon as the queue entry is persisted. Delivery is now the
  // shim's responsibility.
  await outbox.upload(
    sha256: mySha256,
    servers: const ['https://blossom.primal.net', 'https://cdn.satellite.earth'],
  );
}

Semantics #

upload(sha256:, servers:, contentType:) #

Persists a queue entry for the blob identified by sha256 and schedules an immediate attempt to push it to every URL in servers. The blob bytes must already live in the BlossomCache passed to the constructor. upload() looks them up via cache.head(sha256) and throws StateError if absent. The servers list is required; no automatic server-list lookup is performed. URLs are normalized (lowercased, trailing / stripped) before storage.

If a record with the same sha256 already exists, the server lists are merged. deliveredAt is preserved if every server in the merged list is already in the entry's ack set; otherwise the entry is demoted to pending so the missing servers get pushed.

If no contentType is provided, the shim reads it from the cache descriptor.

retryNow() #

Forces an immediate scan of due entries, bypassing the online check. Use it as an explicit override (e.g. when the user pulls to refresh).

Connectivity awareness #

OfflineBlossomUpload.withNdk() subscribes to ndk.connectivity.relayConnectivityChanges and pauses the periodic retry timer while no public relay is connected. Blossom has no dedicated connectivity stream, so the shim treats "a relay is reachable" as a proxy for "the device has internet." Loopback addresses, RFC1918 IPv4, ULA/link-local IPv6, and mDNS .local names are excluded from the "is online" computation so a local dev relay cannot mask a real outage.

For non-NDK setups, pass any Stream<bool> onlineSignal to the default constructor:

OfflineBlossomUpload(
  uploadFn: ...,
  cache: cache,
  db: db,
  onlineSignal: yourConnectivityStream, // true while online, false otherwise
);

If you don't pass anything, the shim assumes it is always online and the periodic timer runs unconditionally.

reupload(sha256, {String? server}) #

ackedServers and deliveredAt are monotonic. reupload never rewrites the past; it queues a one-shot push via a transient forcedServers override that the next attempt consumes.

  • reupload(sha256): schedules an immediate push to every server in the entry's servers list, including those that already acked. Useful when you suspect a server dropped your blob. Acks and deliveredAt are preserved regardless of the new attempt's outcome.
  • reupload(sha256, server: s): pushes to that single server. If s is new to the entry, it joins the target list and the entry is demoted to pending until s acks. If s was already there, the historical state is preserved.

Pin ownership #

The shim pins the blob in the BlossomCache when it takes ownership of an upload, and releases the pin on delivered. It only releases pins it applied itself. If you pinned the blob before calling upload() (e.g. because the same blob is also referenced by something else in your app), the shim records pinnedByShim: false on the queue entry and leaves your pin alone, both during and after delivery.

A blob whose bytes are deleted from the cache while still pending will have its next attempt fail with lastErrors[server] = 'blob bytes missing from cache'. The entry stays in the store; the network call is not made.

What "success" means #

The full target set must ack. The shim uses UploadStrategy.allSimultaneous internally so each server is attempted independently; per-server failures do not affect the others. firstSuccess and mirrorAfterSuccess are deliberately not exposed: they would either fail to cover every target or introduce server-to-server dependencies that complicate the model.

What the shim does NOT do #

  • It never signs. Whatever bytes you pass are forwarded as-is to ndk.blossom.uploadBlob, which signs the BUD-01 authorization event using the EventSigner configured on NDK. The shim has no opinion on signing.
  • It never hashes. The caller provides the sha256. The cache stores by that key. The shim queues by that key.
  • It never requests server-side media optimisation. serverMediaOptimisation would re-encode the blob server-side and change the resulting sha256, breaking content-addressing. Hardcoded false.
  • It never deletes records. Even after full delivery, the entry stays in the database. If you want retention, prune sembast directly.
  • It does not give up. Without a maxAttempts knob, a deterministically rejected upload (size too big for the server, blob type not allowed, etc.) will retry forever with exponential backoff. Inspect QueuedBlobUpload.lastErrors and remove manually if needed.

Tuning #

OfflineBlossomUpload.withNdk(
  ndk,
  cache: cache,
  db: db,
  storeName: 'blob_uploads',                     // sembast store name
  tickInterval: const Duration(seconds: 30),     // periodic retry scan
  initialBackoff: const Duration(seconds: 5),    // backoff floor
  maxBackoff: const Duration(minutes: 30),       // backoff ceiling
  perAttemptTimeout: const Duration(minutes: 5), // give up on a single NDK call after this
);

Testing your integration #

OfflineBlossomUpload is fully unit-testable without NDK. Pass a custom BlobUploadFn to the default constructor:

final outbox = OfflineBlossomUpload(
  uploadFn:
      ({
        required data,
        required serverUrls,
        required precomputedSha256,
        contentType,
      }) async => [
        for (final s in serverUrls)
          BlobUploadResult(serverUrl: s, success: true),
      ],
  cache: await IdbBlossomCache.open(factory: newIdbFactoryMemory()),
  db: await newDatabaseFactoryMemory().openDatabase('test.db'),
);

The package's own test suite uses exactly this approach; see test/offline_blossom_upload_test.dart.

License #

MIT

0
likes
0
points
240
downloads

Publisher

unverified uploader

Weekly Downloads

Offline-first wrapper around the ndk package's Blossom upload use case. Persists per-upload metadata to a sembast queue, reads blob bytes from a blossom_cache, and retries until every targeted server has acknowledged delivery.

Repository (GitHub)
View/report issues

Topics

#nostr #ndk #blossom #offline-first #queue

License

unknown (license)

Dependencies

blossom_cache, ndk, sembast

More

Packages that depend on blossom_upload_queue_shim_for_ndk