stream<T> method

Stream<T?> stream<T>(
  1. String methodName, {
  2. List? args,
})

Invokes a streaming hub method on the server using the specified name and arguments.

Implementation

Stream<T?> stream<T>(String methodName, {List<dynamic>? args}) {
  final streamParameters = _replaceStreamParameters(args);
  final invocationDescriptor = _createStreamInvocation(
      methodName: methodName, args: args, streamIds: streamParameters.item2);

  late Future<void> futureQueue;
  final controller = StreamController<T?>()
    ..onCancel = () {
      final cancelInvocation =
          _createCancelInvocation(id: invocationDescriptor.invocationId);

      _callbacks.remove(invocationDescriptor.invocationId);

      futureQueue.then((value) {
        return _sendWithProtocol(cancelInvocation);
      });
    };

  _callbacks[invocationDescriptor.invocationId] =
      (HubMessage? invocationEvent, Exception? error) {
    if (error != null) {
      controller.addError(error);
      return;
    } else if (invocationEvent != null) {
      if (invocationEvent.type == MessageType.completion) {
        if (invocationEvent is CompletionMessage) {
          if (invocationEvent.error != null) {
            if (invocationEvent.error!.isNotEmpty) {
              controller.addError(Exception(error));
            } else {
              controller.close();
            }
          } else {
            controller.close();
          }
        }
      } else {
        if (invocationEvent is StreamItemMessage) {
          controller.add(invocationEvent.item as T?);
        }
      }
    }
  };

  futureQueue = _sendWithProtocol(invocationDescriptor).catchError((e) {
    if (e is Object) {
      controller.addError(e);
    }
    _callbacks.remove(invocationDescriptor.invocationId);
  });

  _launchStreams(streamParameters.item1, futureQueue);

  return controller.stream;
}