receiveFrame method

void receiveFrame(
  1. RSocketFrame frame
)

Implementation

void receiveFrame(RSocketFrame frame) {
  var header = frame.header;
  var streamId = header.streamId;
  switch (header.type) {
    case frame_types.PAYLOAD:
      var payloadFrame = frame as PayloadFrame;
      if (senders.containsKey(streamId)) {
        var subscriber = senders[streamId];
        var payload = payloadFrame.payload;
        if (payloadFrame.completed) {
          senders.remove(streamId);
          if (payload?.data != null) {
            subscriber!.onNext(payload);
          }
          subscriber!.onComplete();
        } else {
          if (payload?.data != null) {
            subscriber!.onNext(payload);
          }
        }
      }
      break;
    case frame_types.KEEPALIVE:
      var keepAliveFrame = frame as KeepAliveFrame;
      if (keepAliveFrame.respond) {
        connection.write(FrameCodec.encodeKeepAlive(
            false, keepAliveFrame.lastReceivedPosition));
      }
      break;
    case frame_types.ERROR:
      var errorFrame = frame as ErrorFrame;
      var streamId = header.streamId;
      var error = RSocketException(errorFrame.code, errorFrame.message);
      if (streamId == 0 && errorConsumer != null) {
        errorConsumer!(error);
      } else {
        if (senders.containsKey(streamId)) {
          var subscriber = senders[streamId]!;
          senders.remove(streamId);
          subscriber.onError(error);
        }
      }
      break;
    case frame_types.CANCEL:
      var streamId = header.streamId;
      if (senders.containsKey(streamId)) {
        //implement cancel
        //var subscriber = senders[streamId];
        //senders.remove(streamId);
      }
      break;
    case frame_types.REQUEST_RESPONSE:
      var requestResponseFrame = frame as RequestResponseFrame;
      if (responder != null && requestResponseFrame.payload != null) {
        responder!.requestResponse!(requestResponseFrame.payload)
            .then((payload) {
          connection.write(
              FrameCodec.encodePayloadFrame(header.streamId, true, payload));
        }).catchError((error) {
          var rsocketError = convertToRSocketException(error);
          connection.write(FrameCodec.encodeErrorFrame(
              header.streamId, rsocketError.code!, rsocketError.message));
        });
      }
      break;
    case frame_types.REQUEST_FNF:
      var fireAndForgetFrame = frame as RequestFNFFrame;
      if (responder != null && fireAndForgetFrame.payload != null) {
        responder!.fireAndForget!(fireAndForgetFrame.payload)
            .then((value) => {});
      }
      break;
    case frame_types.METADATA_PUSH:
      var metadataPushFrame = frame as MetadataPushFrame;
      if (responder != null && metadataPushFrame.payload != null) {
        responder!.metadataPush!(metadataPushFrame.payload)
            .then((value) => {});
      }
      break;
    case frame_types.REQUEST_STREAM:
      var requestStreamFrame = frame as RequestStreamFrame;
      var requesterStreamId = header.streamId;
      if (responder != null && requestStreamFrame.payload != null) {
        responder!.requestStream!(requestStreamFrame.payload).listen(
            (payload) {
          connection.write(FrameCodec.encodePayloadFrame(
              requesterStreamId, false, payload));
        }, onDone: () {
          connection.write(
              FrameCodec.encodePayloadFrame(requesterStreamId, true, null));
        }, onError: (Object error) {
          if (error is RSocketException) {
            var e = error;
            connection.write(FrameCodec.encodeErrorFrame(
                requesterStreamId, e.code!, e.message));
          } else {
            connection.write(FrameCodec.encodeErrorFrame(requesterStreamId,
                RSocketErrorCode.APPLICATION_ERROR, error.toString()));
          }
        });
      }
      break;
    default:
  }
}