receiveFrame method
Implementation
void receiveFrame(RSocketFrame frame) {
var header = frame.header;
var streamId = header.streamId;
switch (header.type) {
case frame_types.PAYLOAD:
var payloadFrame = frame as PayloadFrame;
if (senders.containsKey(streamId)) {
var subscriber = senders[streamId];
var payload = payloadFrame.payload;
if (payloadFrame.completed) {
senders.remove(streamId);
if (payload?.data != null) {
subscriber!.onNext(payload);
}
subscriber!.onComplete();
} else {
if (payload?.data != null) {
subscriber!.onNext(payload);
}
}
}
break;
case frame_types.KEEPALIVE:
var keepAliveFrame = frame as KeepAliveFrame;
if (keepAliveFrame.respond) {
connection.write(FrameCodec.encodeKeepAlive(
false, keepAliveFrame.lastReceivedPosition));
}
break;
case frame_types.ERROR:
var errorFrame = frame as ErrorFrame;
var streamId = header.streamId;
var error = RSocketException(errorFrame.code, errorFrame.message);
if (streamId == 0 && errorConsumer != null) {
errorConsumer!(error);
} else {
if (senders.containsKey(streamId)) {
var subscriber = senders[streamId]!;
senders.remove(streamId);
subscriber.onError(error);
}
}
break;
case frame_types.CANCEL:
var streamId = header.streamId;
if (senders.containsKey(streamId)) {
//implement cancel
//var subscriber = senders[streamId];
//senders.remove(streamId);
}
break;
case frame_types.REQUEST_RESPONSE:
var requestResponseFrame = frame as RequestResponseFrame;
if (responder != null && requestResponseFrame.payload != null) {
responder!.requestResponse!(requestResponseFrame.payload)
.then((payload) {
connection.write(
FrameCodec.encodePayloadFrame(header.streamId, true, payload));
}).catchError((error) {
var rsocketError = convertToRSocketException(error);
connection.write(FrameCodec.encodeErrorFrame(
header.streamId, rsocketError.code!, rsocketError.message));
});
}
break;
case frame_types.REQUEST_FNF:
var fireAndForgetFrame = frame as RequestFNFFrame;
if (responder != null && fireAndForgetFrame.payload != null) {
responder!.fireAndForget!(fireAndForgetFrame.payload)
.then((value) => {});
}
break;
case frame_types.METADATA_PUSH:
var metadataPushFrame = frame as MetadataPushFrame;
if (responder != null && metadataPushFrame.payload != null) {
responder!.metadataPush!(metadataPushFrame.payload)
.then((value) => {});
}
break;
case frame_types.REQUEST_STREAM:
var requestStreamFrame = frame as RequestStreamFrame;
var requesterStreamId = header.streamId;
if (responder != null && requestStreamFrame.payload != null) {
responder!.requestStream!(requestStreamFrame.payload).listen(
(payload) {
connection.write(FrameCodec.encodePayloadFrame(
requesterStreamId, false, payload));
}, onDone: () {
connection.write(
FrameCodec.encodePayloadFrame(requesterStreamId, true, null));
}, onError: (Object error) {
if (error is RSocketException) {
var e = error;
connection.write(FrameCodec.encodeErrorFrame(
requesterStreamId, e.code!, e.message));
} else {
connection.write(FrameCodec.encodeErrorFrame(requesterStreamId,
RSocketErrorCode.APPLICATION_ERROR, error.toString()));
}
});
}
break;
default:
}
}