peekNewlyAvailable method
Up to limit keys whose availableAt is in (since, asOf] —
keys that crossed the born threshold since the caller's last
sweep watermark. Ascending-availableAt order; asOf
defaults to now.
The watermark pattern: born keys are persistent (unlike
expired keys, which a sweep drains), so "all born keys" is
useless as a trigger signal — every call would return the
same set. Callers MUST track the largest availableAt they
have already processed and pass it as since (exclusive).
The first sweep may pass DateTime(0) or a persisted
watermark. A key whose availableAt == since is NOT yielded;
one whose availableAt == asOf IS.
The watermark is deliberately caller-side state: it adds no per-entry server state and composes with multiple independent readers, each holding its own watermark. If a future caller needs server-side "processed" tracking instead, revisit this contract before building around it.
Implementation
@override
@server
@client
Future<Stream<String>> peekNewlyAvailable({
required DateTime since,
DateTime? asOf,
int? limit,
}) async {
final cutoff = asOf ?? DateTime.timestamp();
return Stream.fromIterable(_collectAscending(
availableAt, limit, (t) => t.isAfter(since) && !t.isAfter(cutoff)));
}