stream<T> method
Invokes a streaming hub method on the server using the specified name and arguments.
Implementation
Stream<T?> stream<T>(String methodName, {List<dynamic>? args}) {
final streamParameters = _replaceStreamParameters(args);
final invocationDescriptor = _createStreamInvocation(
methodName: methodName, args: args, streamIds: streamParameters.item2);
late Future<void> futureQueue;
final controller = StreamController<T?>()
..onCancel = () {
final cancelInvocation =
_createCancelInvocation(id: invocationDescriptor.invocationId);
_callbacks.remove(invocationDescriptor.invocationId);
futureQueue.then((value) {
return _sendWithProtocol(cancelInvocation);
});
};
_callbacks[invocationDescriptor.invocationId] =
(HubMessage? invocationEvent, Exception? error) {
if (error != null) {
controller.addError(error);
return;
} else if (invocationEvent != null) {
if (invocationEvent.type == MessageType.completion) {
if (invocationEvent is CompletionMessage) {
if (invocationEvent.error != null) {
if (invocationEvent.error!.isNotEmpty) {
controller.addError(Exception(error));
} else {
controller.close();
}
} else {
controller.close();
}
}
} else {
if (invocationEvent is StreamItemMessage) {
controller.add(invocationEvent.item as T?);
}
}
}
};
futureQueue = _sendWithProtocol(invocationDescriptor).catchError((e) {
if (e is Object) {
controller.addError(e);
}
_callbacks.remove(invocationDescriptor.invocationId);
});
_launchStreams(streamParameters.item1, futureQueue);
return controller.stream;
}