stream method

Stream<Object?> stream(
  1. String methodName,
  2. List<Object> args
)

Invokes a streaming hub method on the server using the specified name and arguments.

T: The type of the items returned by the server. methodName: The name of the server method to invoke. args: The arguments used to invoke the server method. Returns an object that yields results from the server as they are received.

Implementation

Stream<Object?> stream(String methodName, List<Object> args) {
  final t = _replaceStreamingParams(args);
  final invocationDescriptor =
      _createStreamInvocation(methodName, args, t.item2);

  late Future<void> promiseQueue;
  final StreamController streamController = StreamController<Object?>(
    onCancel: () {
      final cancelInvocation =
          _createCancelInvocation(invocationDescriptor.invocationId);
      _callbacks.remove(invocationDescriptor.invocationId);

      return promiseQueue.then((_) => _sendWithProtocol(cancelInvocation));
    },
  );

  _callbacks[invocationDescriptor.invocationId] =
      (HubMessageBase? invocationEvent, Exception? error) {
    if (error != null) {
      streamController.addError(error);
      return;
    } else if (invocationEvent != null) {
      // invocationEvent will not be null when an error is not passed to the callback
      if (invocationEvent is CompletionMessage) {
        if (invocationEvent.error != null) {
          streamController.addError(GeneralError(invocationEvent.error));
        } else {
          streamController.close();
        }
      } else if (invocationEvent is StreamItemMessage) {
        streamController.add(invocationEvent.item);
      }
    }
  };

  promiseQueue = _sendWithProtocol(invocationDescriptor).catchError((e) {
    streamController.addError(e);
    _callbacks.remove(invocationDescriptor.invocationId);
  });

  _launchStreams(t.item1, promiseQueue);

  return streamController.stream;
}