subscribe method

Future<PersistentSubscription> subscribe(
  1. String streamId,
  2. String groupName, {
  3. int bufferSize = 10,
  4. bool autoAck = true,
  5. UserCredentials? userCredentials,
  6. SubscriptionDroppedCallback? onSubscriptionDropped,
  7. EventStoreClientOperationOptions? operationOptions,
  8. PersistentSubscriptionResolvedEventCallback? onEventAppeared,
})

Subscribes to ResolvedEvents from a persistent subscription.

Parameters streamId and groupName identifies the PersistentSubscription to subscribe to.

Callback PersistentSubscriptionResolvedEventCallback is invoked and awaited when a new event is received over the subscription.

Callback SubscriptionDroppedCallback is invoked and awaited when the subscription is dropped.

Returns as PersistentSubscription on first response from the server.

Implementation

Future<PersistentSubscription> subscribe(
  String streamId,
  String groupName, {
  int bufferSize = 10,
  bool autoAck = true,
  UserCredentials? userCredentials,
  SubscriptionDroppedCallback? onSubscriptionDropped,
  EventStoreClientOperationOptions? operationOptions,
  PersistentSubscriptionResolvedEventCallback? onEventAppeared,
}) async {
  if (streamId == SystemStreams.AllStream) {
    $verifyFeatureAllowed(
      ApiFeature.PersistentSubscriptionOnAllStream,
    );
  }
  return $runRequest<PersistentSubscription>(() async {
    if (!supportsClientStreamingRpc) {
      throw GrpcOperationUnsupportedException(
        'grpc-web does not support client streaming',
      );
    }
    final options = ReadReq_Options()
      ..bufferSize = bufferSize
      ..groupName = groupName
      ..uuidOption = (ReadReq_Options_UUIDOption()..string = Empty());
    if (streamId == SystemStreams.AllStream) {
      options.all = Empty();
    } else {
      options.streamIdentifier = toStreamIdentifier(streamId);
    }
    final client = await $getClient();
    final request = ReadReq()..options = options;
    final requests = StreamController<ReadReq>()..add(request);
    final resultStream = client.read(
      requests.stream,
      options: $getOptions(
        userCredentials: userCredentials,
        operationOptions: operationOptions,
      ),
    );
    return PersistentSubscription(
      requests.sink,
      await _toEnumerator(
        resultStream,
        StreamState.any(streamId),
      ),
      autoAck: autoAck,
      onEventAppeared: onEventAppeared,
      onReplay: ([int? startAt]) => replayParked(
        streamId,
        groupName,
        leader,
        settings: settings,
        userCredentials: userCredentials,
        channelCredentials: $getOrAddCredentials(leader),
      ),
      onSubscriptionDropped: onSubscriptionDropped,
    );
  });
}