receive method

  1. @override
Stream<AbstractMessage> receive()
override

Implementation

@override
Stream<AbstractMessage> receive() {
  _socket!.done.then((done) {
    if (!_goodbyeSent && !_goodbyeReceived && !_onDisconnect!.isCompleted) {
      _onConnectionLost!.complete();
    } else if (!_onDisconnect!.isCompleted) {
      _onDisconnect!.complete();
    }
  }, onError: (error) {
    if (!_goodbyeSent && !_goodbyeReceived && !_onDisconnect!.isCompleted) {
      _onConnectionLost!.complete(error);
    }
  });
  // TODO set keep alive to true
  //_socket.setOption(RawSocketOption.fromBool(??, SO_KEEPALIVE, true), true)
  return _socket!.where((List<int> message) {
    message = Uint8List.fromList(_inboundBuffer + message);
    if (_negotiateProtocol(message as Uint8List) ||
        !_assertValidMessage(message)) {
      return false;
    }
    final finalMessageLength = message.length - headerLength;
    final payloadLength =
        SocketHelper.getPayloadLength(message, headerLength);
    if (finalMessageLength < payloadLength) {
      _inboundBuffer = message;
      return false;
    }
    if (finalMessageLength > _messageLength!) {
      _sendProtocolError(SocketHelper.errorMessageLengthExceeded);
      _logger.fine(
          'Closed raw socket channel because the message length exceeded the max value of $_messageLength');
      return false;
    }
    return true;
  }).expand((Uint8List message) => _handleMessage(message));
}