AsyncValueStream<T> constructor

AsyncValueStream<T>({
  1. String? debugName,
  2. T? initialValue,
  3. bool isUnique = true,
  4. ClearingHouseMode mode = ClearingHouseMode.AllowSynchronousValues,
})

Implementation

AsyncValueStream({
  String? debugName,
  T? initialValue,
  this.isUnique = true,
  this.mode = ClearingHouseMode.AllowSynchronousValues,
})  : debugName = debugName ?? "asyncValueStream.$T",
      log = Logger(debugName ?? "asyncValueStream.$T"),
      _after = StreamController.broadcast() {
  /// Listen to requests stream but process requests in chunks
  _requests.stream.asyncMapBuffer((requests) async {
    final latestRequest = (this._inflight = requests.max());

    /// Cancel any requests that aren't the latest
    await requests
        .where((req) => req != latestRequest)
        .map((req) => req.cancel())
        .awaitAll();

    if (latestRequest.requestId < _accepted) {
      log.warning(
          "Skipping ${latestRequest.requestId} because it was too stale");

//        _nextFrame
//          ..start()
//          ..complete(_current)
//          ..reset();
      return;
    } else {
      /// Start that request
      final thisRequestId = latestRequest.requestId;
      _nextFrame.start();
      final cancellable = latestRequest.start();
      try {
        /// And wait for either completion or cancellation
        final result = await cancellable
            ?.valueOrCancellation(const UpdateResult.cancelled());
        if (result?.isCompleted != true) {
          log.info("Ignoring cancelled request ${latestRequest.requestId}");
        } else {
          log.fine(
              "Request ${latestRequest.requestId} completed with ${result?.value}");
          final mostRecentRequest = _requestId - 1;
          if (mostRecentRequest <= thisRequestId) {
            log.fine(mostRecentRequest < thisRequestId
                ? "Older request $mostRecentRequest will be superceded by $thisRequestId"
                : "Request $thisRequestId is the latest");
            _inflight = null;
            _internalUpdate(latestRequest.requestId, result?.value);
            _nextFrame
              ..start()
              ..complete(current)
              ..reset();
          } else {
            log.fine(
                "Expecting a newer value $mostRecentRequest than $thisRequestId, so we're not going to complete.  Logging result as _current");
            _current = result!.value;
            _isResolved = true;
          }
        }
      } catch (e, stack) {
        log.severe("Error updating result: $e", e, stack);
        _nextFrame
          ..start()
          ..complete(current)
          ..reset();
      }
    }
  }).autodispose(this);

  if (initialValue != null) {
    this.syncUpdate(initialValue);
  }
}