subscribe method
This subscribes the session to a topic
. The subscriber may pass options
while subscribing. The resulting events are passed to the Subscribed.eventStream.
The subscriber should therefore subscribe to that stream to receive the events.
Implementation
Future<Subscribed> subscribe(String topic,
{SubscribeOptions? options}) async {
var subscribe = Subscribe(nextSubscribeId++, topic, options: options);
_transport.send(subscribe);
AbstractMessage subscribed = await _openSessionStreamController.stream
.where((message) =>
(message is Subscribed &&
message.subscribeRequestId == subscribe.requestId) ||
(message is Error &&
message.requestTypeId == MessageTypes.codeSubscribe &&
message.requestId == subscribe.requestId))
.first;
if (subscribed is Subscribed) {
subscriptions[subscribed.subscriptionId] = subscribed;
subscribed.eventStream =
_openSessionStreamController.stream.where((message) {
if (message is Unsubscribed &&
message.details?.subscription == subscribed.subscriptionId) {
subscriptions.remove(subscribed.subscriptionId);
subscribed.revoke(message.details!.reason);
return false;
}
return message is Event &&
subscriptions[subscribed.subscriptionId] != null &&
message.subscriptionId == subscribed.subscriptionId;
}).map((event) {
var eventUpdated = event;
if (event.details.pptScheme == 'wamp') {
// It's E2EE payload
var e2eePayload =
E2EEPayload.unpackE2EEPayload(event.arguments, event.details);
event.arguments = e2eePayload.arguments;
event.argumentsKeywords = e2eePayload.argumentsKeywords;
} else if (event.details.pptScheme != null) {
// It's some variation of PPT
var pptPayload =
PPTPayload.unpackPPTPayload(event.arguments, event.details);
event.arguments = pptPayload.arguments;
event.argumentsKeywords = pptPayload.argumentsKeywords;
}
return eventUpdated;
}).cast();
return subscribed;
} else {
throw subscribed;
}
}