broadcast_queue_shim_for_ndk 0.2.1 copy "broadcast_queue_shim_for_ndk: ^0.2.1" to clipboard
broadcast_queue_shim_for_ndk: ^0.2.1 copied to clipboard

Offline-first wrapper around the ndk package's broadcast use case. Persists Nostr events to a local sembast queue and retries until every targeted relay has acknowledged delivery.

broadcast_queue_shim_for_ndk #

Offline-first wrapper around the ndk package's broadcast use case.

NDK's broadcast sends a Nostr event to a set of relays and reports per-relay results. If every relay is unreachable (flaky network, app backgrounded, process killed), the event is gone from the caller's perspective. This shim sits in front of ndk.broadcast and adds:

  • Local persistence first. The event is committed to a sembast store before any network attempt. broadcast() returns once persistence is durable; delivery happens in the background and survives restarts.
  • 100 % delivery guarantee. An entry is only marked delivered once every targeted relay has returned broadcastSuccessful: true. Partial success keeps the entry pending and retries the missing relays.
  • Monotonic ack history. A relay that has acked never un-acks. A delivered entry never silently flips back to pending due to a transient relay outage.
  • No auto-deletion. Delivered entries stay in the store and can be re-broadcast later, for instance to a freshly discovered relay.

Install #

dependencies:
  broadcast_queue_shim_for_ndk: ^0.1.0
  ndk: ^0.8.3
  sembast: ^3.8.7

Quick start #

import 'package:broadcast_queue_shim_for_ndk/broadcast_queue_shim_for_ndk.dart';
import 'package:ndk/ndk.dart';
import 'package:sembast/sembast_io.dart';

Future<void> main() async {
  final db = await databaseFactoryIo.openDatabase('broadcasts.db');

  final ndk = Ndk(
    NdkConfig(
      eventVerifier: Bip340EventVerifier(),
      cache: MemCacheManager(),
    ),
  );

  final outbox = OfflineBroadcast.withNdk(ndk, db: db);
  outbox.start();

  final event = Nip01Event(
    pubKey: myPubKey,
    kind: 1,
    tags: const [],
    content: 'hello from a flaky network',
  );

  // Returns as soon as the event is persisted. Delivery is now the shim's
  // responsibility.
  await outbox.broadcast(
    event,
    relays: const ['wss://relay.damus.io', 'wss://nos.lol'],
  );
}

Semantics #

broadcast(event, relays: [...]) #

Persists event and schedules an immediate attempt to push it to every URL in relays. The list is required: gossip-based relay selection is never used. URLs are normalized (lowercased, trailing / stripped) before storage.

If a record with the same event.id already exists, the relay lists are merged. deliveredAt is preserved if every relay in the merged list is already in the entry's ack set; otherwise the entry is demoted to pending so the missing relays get pushed.

retryNow() #

Forces an immediate scan of due entries, bypassing the online check. Use it as an explicit override (e.g. when the user pulls to refresh).

Connectivity awareness #

OfflineBroadcast.withNdk() subscribes to ndk.connectivity.relayConnectivityChanges and pauses the periodic retry timer while no public relay is connected. As soon as a public relay comes online, the shim replays everything that's due. Loopback addresses, RFC1918 IPv4, ULA/link-local IPv6, and mDNS .local names are excluded from the "is online" computation so a local dev relay cannot mask a real outage.

For non-NDK setups, pass any Stream<bool> onlineSignal to the default constructor:

OfflineBroadcast(
  broadcastFn: ...,
  db: db,
  onlineSignal: yourConnectivityStream, // true while online, false otherwise
);

If you don't pass anything, the shim assumes it is always online and the periodic timer runs unconditionally (pre-0.2 behavior).

rebroadcast(id, {String? relay}) #

ackedRelays and deliveredAt are monotonic. rebroadcast never rewrites the past; it queues a one-shot push via a transient forcedRelays override that the next attempt consumes.

  • rebroadcast(id): schedules an immediate push to every relay in the entry's relays list, including those that already acked. Useful when you suspect a relay dropped your event. Acks and deliveredAt are preserved regardless of the new attempt's outcome.
  • rebroadcast(id, relay: r): pushes to that single relay. If r is new to the entry, it joins the target list and the entry is demoted to pending until r acks. If r was already there, the historical state is preserved.

What "success" means #

The full target set must ack. NDK's own considerDonePercent knob is not used as a delivery threshold; it only governs when the underlying future completes, which is a different question.

What the shim does NOT do #

  • It never signs. Whatever event you pass is forwarded as-is to ndk.broadcast.broadcast. If the event is unsigned, NDK signs it using its configured EventSigner. The shim has no opinion on signing.
  • It never deletes records. Even after full delivery, the entry stays in the database. If you want retention, prune it yourself by clearing records from sembast directly.
  • It does not give up. Without a maxAttempts knob, a deterministically rejected event (POW too low, kind not allowed by a relay, etc.) will retry forever with exponential backoff. Inspect QueuedBroadcast.lastErrors and manually remove if needed.

Tuning #

OfflineBroadcast.withNdk(
  ndk,
  db: db,
  storeName: 'broadcasts',                       // sembast store name
  tickInterval: const Duration(seconds: 30),     // periodic retry scan
  initialBackoff: const Duration(seconds: 5),    // backoff floor
  maxBackoff: const Duration(minutes: 30),       // backoff ceiling
  perAttemptTimeout: const Duration(seconds: 10),// gives up on a single NDK call after this
);

Architecture in one diagram #

caller.broadcast(event, relays)
        │
        ▼
  sembast write ──── durable, returns to caller here
        │
        ▼
  ndk.broadcast.broadcast(event, specificRelays: remaining)
        │
        ▼
  await broadcastDoneFuture
        │
        ▼
  per-relay union into ackedRelays  ────►  delivered when ⊇ relays
  per-relay error into lastErrors          (otherwise schedule backoff)

A Timer.periodic scans findDue every tickInterval and replays whatever is overdue. retryNow() runs the same scan immediately.

Testing your integration #

OfflineBroadcast is fully unit-testable without NDK. Pass a custom BroadcastFn to the default constructor:

final outbox = OfflineBroadcast(
  broadcastFn: (event, relays) => NdkBroadcastResponse(
    publishEvent: event,
    broadcastDoneStream: Stream.value([
      for (final r in relays)
        RelayBroadcastResponse(
          relayUrl: r,
          okReceived: true,
          broadcastSuccessful: true,
        ),
    ]),
  ),
  db: await newDatabaseFactoryMemory().openDatabase('test.db'),
);

The package's own test suite uses exactly this approach; see test/offline_broadcast_test.dart.

License #

MIT

0
likes
160
points
91
downloads

Documentation

API reference

Publisher

unverified uploader

Weekly Downloads

Offline-first wrapper around the ndk package's broadcast use case. Persists Nostr events to a local sembast queue and retries until every targeted relay has acknowledged delivery.

Repository (GitHub)
View/report issues

Topics

#nostr #ndk #offline-first #queue

License

MIT (license)

Dependencies

ndk, sembast

More

Packages that depend on broadcast_queue_shim_for_ndk