watch method

Future<Either<L, VerboseStream<L, R>>?> watch({
  1. required P params,
  2. bool cancelOnError = false,
})

Starts creating the stream to watch.

This will initially emit a StartWatching state followed either by a StartWatchFailed or StartWatchSuccess.

Afterwards, the generated stream may emit a WatchDataReceived for data events, a WatchErrorReceived for error events, or a WatchDone when the stream has been closed.

If successful, then this will return the VerboseStream of StartWatchSuccess which gives you access to the stream. Otherwise, this will return the leftValue of StartWatchFailed.

If the Watcher is closed or a new watch call has been made while the current instance is still in progress, then this will return null.

Implementation

Future<Either<L, VerboseStream<L, R>>?> watch({
  required P params,
  bool cancelOnError = false,
}) async {
  final actionToken = requestNewActionToken();

  await (_streamSubscription?.cancel() ?? ensureAsync());

  if (isClosed) return null;

  if (distinctEmit(
        actionToken,
        () => StartWatching(actionToken),
      ) ==
      null) {
    return null;
  }

  final result = await onCall(params);

  if (isClosed) return null;

  distinctEmit(actionToken, () {
    setParamsAndValue(params, result);

    return result.fold(
      (l) => StartWatchFailed<L>(l, actionToken),
      (r) {
        _streamSubscription = r.listen(
          (data) {
            if (isClosed) return;

            distinctEmit(
              actionToken,
              () {
                event = Right(data);

                return WatchDataReceived<R>(data, actionToken);
              },
            );
          },
          onError: (error) {
            if (isClosed) return;

            distinctEmit(
              actionToken,
              () {
                event = Left(error);

                return WatchErrorReceived<L>(error, actionToken);
              },
            );
          },
          onDone: () {
            if (isClosed) return;

            distinctEmit(
              actionToken,
              () => WatchDone(actionToken),
            );
          },
          cancelOnError: cancelOnError,
        );

        return StartWatchSuccess(r, actionToken);
      },
    );
  });

  return result;
}