subscribe method
Future<PersistentSubscription>
subscribe(
- String streamId,
- String groupName, {
- int bufferSize = 10,
- bool autoAck = true,
- UserCredentials? userCredentials,
- SubscriptionDroppedCallback? onSubscriptionDropped,
- EventStoreClientOperationOptions? operationOptions,
- PersistentSubscriptionResolvedEventCallback? onEventAppeared,
Subscribes to ResolvedEvents from a persistent subscription.
Parameters streamId
and groupName
identifies the
PersistentSubscription
to subscribe to.
Callback PersistentSubscriptionResolvedEventCallback is invoked and awaited when a new event is received over the subscription.
Callback SubscriptionDroppedCallback is invoked and awaited when the subscription is dropped.
Returns as PersistentSubscription
on first response from the server.
Implementation
Future<PersistentSubscription> subscribe(
String streamId,
String groupName, {
int bufferSize = 10,
bool autoAck = true,
UserCredentials? userCredentials,
SubscriptionDroppedCallback? onSubscriptionDropped,
EventStoreClientOperationOptions? operationOptions,
PersistentSubscriptionResolvedEventCallback? onEventAppeared,
}) async {
if (streamId == SystemStreams.AllStream) {
$verifyFeatureAllowed(
ApiFeature.PersistentSubscriptionOnAllStream,
);
}
return $runRequest<PersistentSubscription>(() async {
if (!supportsClientStreamingRpc) {
throw GrpcOperationUnsupportedException(
'grpc-web does not support client streaming',
);
}
final options = ReadReq_Options()
..bufferSize = bufferSize
..groupName = groupName
..uuidOption = (ReadReq_Options_UUIDOption()..string = Empty());
if (streamId == SystemStreams.AllStream) {
options.all = Empty();
} else {
options.streamIdentifier = toStreamIdentifier(streamId);
}
final client = await $getClient();
final request = ReadReq()..options = options;
final requests = StreamController<ReadReq>()..add(request);
final resultStream = client.read(
requests.stream,
options: $getOptions(
userCredentials: userCredentials,
operationOptions: operationOptions,
),
);
return PersistentSubscription(
requests.sink,
await _toEnumerator(
resultStream,
StreamState.any(streamId),
),
autoAck: autoAck,
onEventAppeared: onEventAppeared,
onReplay: ([int? startAt]) => replayParked(
streamId,
groupName,
leader,
settings: settings,
userCredentials: userCredentials,
channelCredentials: $getOrAddCredentials(leader),
),
onSubscriptionDropped: onSubscriptionDropped,
);
});
}