peekNewlyAvailable method

  1. @override
  2. @server
  3. @client
Future<Stream<String>> peekNewlyAvailable({
  1. required DateTime since,
  2. DateTime? asOf,
  3. int? limit,
})
override

Up to limit keys whose availableAt is in (since, asOf] — keys that crossed the born threshold since the caller's last sweep watermark. Ascending-availableAt order; asOf defaults to now.

The watermark pattern: born keys are persistent (unlike expired keys, which a sweep drains), so "all born keys" is useless as a trigger signal — every call would return the same set. Callers MUST track the largest availableAt they have already processed and pass it as since (exclusive). The first sweep may pass DateTime(0) or a persisted watermark. A key whose availableAt == since is NOT yielded; one whose availableAt == asOf IS.

The watermark is deliberately caller-side state: it adds no per-entry server state and composes with multiple independent readers, each holding its own watermark. If a future caller needs server-side "processed" tracking instead, revisit this contract before building around it.

Implementation

@override
@server
@client
Future<Stream<String>> peekNewlyAvailable({
  required DateTime since,
  DateTime? asOf,
  int? limit,
}) async {
  final cutoff = asOf ?? DateTime.timestamp();
  return Stream.fromIterable(_collectAscending(
      availableAt, limit, (t) => t.isAfter(since) && !t.isAfter(cutoff)));
}