subscribe method

Future<EventStreamSubscription> subscribe(
  1. String streamId, {
  2. Position? position,
  3. bool resolveLinks = false,
  4. UserCredentials? userCredentials,
  5. SubscriptionResolvedEventCallback? onEventAppeared,
  6. SubscriptionDroppedCallback? onSubscriptionDropped,
  7. EventStoreClientOperationOptions? operationOptions,
})

Subscribe to ResolvedEvents from given stream streamId.

Start at first event after optional position (event at position is not returned). When position is not given, all events from StreamPosition.start is returned.

Callback SubscriptionResolvedEventCallback is invoked and awaited when a new event is received over the subscription.

Callback SubscriptionDroppedCallback is invoked and awaited when the subscription is dropped.

Use resolveLinks to resolve links as ResolvedEvent (default is false).

Returns as EventStreamSubscription on first response from the server.

Implementation

Future<EventStreamSubscription> subscribe(
  String streamId, {
  Position? position,
  bool resolveLinks = false,
  UserCredentials? userCredentials,
  SubscriptionResolvedEventCallback? onEventAppeared,
  SubscriptionDroppedCallback? onSubscriptionDropped,
  EventStoreClientOperationOptions? operationOptions,
}) async {
  if (streamId == SystemStreams.AllStream) {
    if (position != null && position is! LogPosition) {
      throw ArgumentError.value(
        position,
        'position',
        'Stream \$all requires type LogPosition',
      );
    }
    return subscribeToAll(
      resolveLinks: resolveLinks,
      onEventAppeared: onEventAppeared,
      userCredentials: userCredentials,
      position: position as LogPosition,
      operationOptions: operationOptions,
      onSubscriptionDropped: onSubscriptionDropped,
    );
  }
  return $runRequest<EventStreamSubscription>(() async {
    final state = StreamState.any(
      streamId,
      revision:
          ((position ?? StreamPosition.start) as StreamPosition).toRevision(),
    );
    final request = _setSubscriptionReq(
      state.toReadReq(
        resolveLinks: resolveLinks,
      ),
    );
    final client = await $getClient();
    final resultStream = client.read(
      request,
      options: $getOptions(
        userCredentials: userCredentials,
        operationOptions: operationOptions,
        timeoutAfter: toSubscriptionTimeout(operationOptions),
      ),
    );
    return _toSubscriptionResult(
      state,
      resultStream,
      onEventAppeared: onEventAppeared,
      onSubscriptionDropped: onSubscriptionDropped,
    );
  });
}