receive method
Implementation
@override
Stream<AbstractMessage> receive() {
_socket!.done.then((done) {
if (!_goodbyeSent && !_goodbyeReceived && !_onDisconnect!.isCompleted) {
_onConnectionLost!.complete();
} else if (!_onDisconnect!.isCompleted) {
_onDisconnect!.complete();
}
}, onError: (error) {
if (!_goodbyeSent && !_goodbyeReceived && !_onDisconnect!.isCompleted) {
_onConnectionLost!.complete(error);
}
});
// TODO set keep alive to true
//_socket.setOption(RawSocketOption.fromBool(??, SO_KEEPALIVE, true), true)
return _socket!.where((List<int> message) {
message = Uint8List.fromList(_inboundBuffer + message);
if (_negotiateProtocol(message as Uint8List) ||
!_assertValidMessage(message)) {
return false;
}
final finalMessageLength = message.length - headerLength;
final payloadLength =
SocketHelper.getPayloadLength(message, headerLength);
if (finalMessageLength < payloadLength) {
_inboundBuffer = message;
return false;
}
if (finalMessageLength > _messageLength!) {
_sendProtocolError(SocketHelper.errorMessageLengthExceeded);
_logger.fine(
'Closed raw socket channel because the message length exceeded the max value of $_messageLength');
return false;
}
return true;
}).expand((Uint8List message) => _handleMessage(message));
}