subscribe method
Future<EventStreamSubscription>
subscribe(
- String streamId, {
- Position? position,
- bool resolveLinks = false,
- UserCredentials? userCredentials,
- SubscriptionResolvedEventCallback? onEventAppeared,
- SubscriptionDroppedCallback? onSubscriptionDropped,
- EventStoreClientOperationOptions? operationOptions,
Subscribe to ResolvedEvents from given stream streamId
.
Start at first event after optional position
(event at position is
not returned). When position
is not given, all events from
StreamPosition.start is returned.
Callback SubscriptionResolvedEventCallback is invoked and awaited when a new event is received over the subscription.
Callback SubscriptionDroppedCallback is invoked and awaited when the subscription is dropped.
Use resolveLinks
to resolve links as ResolvedEvent (default is false).
Returns as EventStreamSubscription
on first response from the server.
Implementation
Future<EventStreamSubscription> subscribe(
String streamId, {
Position? position,
bool resolveLinks = false,
UserCredentials? userCredentials,
SubscriptionResolvedEventCallback? onEventAppeared,
SubscriptionDroppedCallback? onSubscriptionDropped,
EventStoreClientOperationOptions? operationOptions,
}) async {
if (streamId == SystemStreams.AllStream) {
if (position != null && position is! LogPosition) {
throw ArgumentError.value(
position,
'position',
'Stream \$all requires type LogPosition',
);
}
return subscribeToAll(
resolveLinks: resolveLinks,
onEventAppeared: onEventAppeared,
userCredentials: userCredentials,
position: position as LogPosition,
operationOptions: operationOptions,
onSubscriptionDropped: onSubscriptionDropped,
);
}
return $runRequest<EventStreamSubscription>(() async {
final state = StreamState.any(
streamId,
revision:
((position ?? StreamPosition.start) as StreamPosition).toRevision(),
);
final request = _setSubscriptionReq(
state.toReadReq(
resolveLinks: resolveLinks,
),
);
final client = await $getClient();
final resultStream = client.read(
request,
options: $getOptions(
userCredentials: userCredentials,
operationOptions: operationOptions,
timeoutAfter: toSubscriptionTimeout(operationOptions),
),
);
return _toSubscriptionResult(
state,
resultStream,
onEventAppeared: onEventAppeared,
onSubscriptionDropped: onSubscriptionDropped,
);
});
}