AsyncValueStream<T> constructor
AsyncValueStream<T> ({
- String? debugName,
- T? initialValue,
- bool isUnique = true,
- ClearingHouseMode mode = ClearingHouseMode.AllowSynchronousValues,
Implementation
AsyncValueStream({
String? debugName,
T? initialValue,
this.isUnique = true,
this.mode = ClearingHouseMode.AllowSynchronousValues,
}) : debugName = debugName ?? "asyncValueStream.$T",
log = Logger(debugName ?? "asyncValueStream.$T"),
_after = StreamController.broadcast() {
/// Listen to requests stream but process requests in chunks
_requests.stream.asyncMapBuffer((requests) async {
final latestRequest = (this._inflight = requests.max());
/// Cancel any requests that aren't the latest
await requests
.where((req) => req != latestRequest)
.map((req) => req.cancel())
.awaitAll();
if (latestRequest.requestId < _accepted) {
log.warning(
"Skipping ${latestRequest.requestId} because it was too stale");
// _nextFrame
// ..start()
// ..complete(_current)
// ..reset();
return;
} else {
/// Start that request
final thisRequestId = latestRequest.requestId;
_nextFrame.start();
final cancellable = latestRequest.start();
try {
/// And wait for either completion or cancellation
final result = await cancellable
?.valueOrCancellation(const UpdateResult.cancelled());
if (result?.isCompleted != true) {
log.info("Ignoring cancelled request ${latestRequest.requestId}");
} else {
log.fine(
"Request ${latestRequest.requestId} completed with ${result?.value}");
final mostRecentRequest = _requestId - 1;
if (mostRecentRequest <= thisRequestId) {
log.fine(mostRecentRequest < thisRequestId
? "Older request $mostRecentRequest will be superceded by $thisRequestId"
: "Request $thisRequestId is the latest");
_inflight = null;
_internalUpdate(latestRequest.requestId, result?.value);
_nextFrame
..start()
..complete(current)
..reset();
} else {
log.fine(
"Expecting a newer value $mostRecentRequest than $thisRequestId, so we're not going to complete. Logging result as _current");
_current = result!.value;
_isResolved = true;
}
}
} catch (e, stack) {
log.severe("Error updating result: $e", e, stack);
_nextFrame
..start()
..complete(current)
..reset();
}
}
}).autodispose(this);
if (initialValue != null) {
this.syncUpdate(initialValue);
}
}