flush method

  1. @override
Future flush([
  1. bool oneway = false
])
override

Flush any pending data out of a transport buffer. Throws TTransportError if there was an error writing out data.

Implementation

@override
Future flush([bool oneway = false]) async {
  try {
    TApplicationError? tae;
    Int8List buf = writeBuffer_.get();
    // writeBuffer_.clear();
    int len = writeBuffer_.len();
    if (len >= 2 &&
        buf[0] == TCompactProtocol.PROTOCOL_ID &&
        ((buf[1] >> TCompactProtocol.TYPE_SHIFT_AMOUNT) & 0x03) ==
            TMessageType.EXCEPTION) {
      TCompactProtocol proto = TCompactProtocol(TMemoryInputTransport(buf));
      // ignore: unused_local_variable
      TMessage msg = proto.readMessageBegin();
      tae = TApplicationError.read(proto);
    } else if (len >= 4 &&
        ((buf[0] << 24) | (buf[1] << 16)) == TBinaryProtocol.VERSION_1 &&
        buf[3] == TMessageType.EXCEPTION) {
      TBinaryProtocol proto = TBinaryProtocol(TMemoryInputTransport(buf));
      // ignore: unused_local_variable
      TMessage msg = proto.readMessageBegin();
      tae = TApplicationError.read(proto);
    }
    if (tae != null) {
      if (!_writeHeaders.containsKey('uex')) {
        _writeHeaders['uex'] = "TApplicationError";
      }
      if (!_writeHeaders.containsKey('uexw')) {
        _writeHeaders['uexw'] = tae.message ?? '[null]';
      }
    }
  } catch (e) {
    // ignore
  }

  nio.ByteBuffer frame = nio.ByteBuffer.wrap(writeBuffer_.get());
  frame.limit = writeBuffer_.len();
  writeBuffer_.reset();

  if (clientType == ClientTypes.HEADERS) {
    frame = transform(frame);
  }

  if (frame.remaining() > MAX_FRAME_SIZE) {
    throw TTransportError(null, "Frame size exceeds max length");
  }

  if (_protoId == T_JSON_PROTOCOL && clientType != ClientTypes.HTTP) {
    throw TTransportError(null, "Trying to send JSON encoding over binary");
  }

  if (clientType == ClientTypes.HEADERS) {
    nio.ByteBuffer transformData =
        nio.ByteBuffer.allocate(_writeTransforms.length * 5);

    int numTransforms = _writeTransforms.length;
    for (Transforms trans in _writeTransforms) {
      writeVarint(transformData, trans.value);
    }
    transformData.limit = transformData.position;
    transformData.position = 0;

    if (_identity != null && _identity!.isNotEmpty) {
      _writeHeaders[ID_VERSION_HEADER] = ID_VERSION;
      _writeHeaders[IDENTITY_HEADER] = _identity!;
    }

    nio.ByteBuffer infoData1 =
        flushInfoHeaders(Infos.INFO_PKEYVALUE, _writePersistentHeaders);
    nio.ByteBuffer infoData2 =
        flushInfoHeaders(Infos.INFO_KEYVALUE, _writeHeaders);

    nio.ByteBuffer headerData = nio.ByteBuffer.allocate(10);
    writeVarint(headerData, _protoId);
    writeVarint(headerData, numTransforms);
    headerData.limit = headerData.position;
    headerData.position = 0;

    int headerSize = transformData.remaining() +
        infoData1.remaining() +
        infoData2.remaining() +
        headerData.remaining();

    int paddingSize = 4 - headerSize % 4;
    headerSize += paddingSize;

    nio.ByteBuffer out = nio.ByteBuffer.allocate(headerSize + 14);

    encodeInt(out, 10 + headerSize + frame.remaining());
    encodeShort(out, HEADER_MAGIC >> 16);
    encodeShort(out, _flags);
    encodeInt(out, _seqId);
    encodeShort(out, (headerSize / 4).round());

    out.putSrc(headerData);
    out.putSrc(transformData);
    out.putSrc(infoData1);
    out.putSrc(infoData2);

    for (int i = 0; i < paddingSize; i++) {
      out.put(b: 0x00);
    }

    out.position = 0;

    transport_.write(out.array(), out.position, out.remaining());
    transport_.write(frame.array(), frame.position, frame.remaining());
  } else if (clientType == ClientTypes.FRAMED_DEPRECATED) {
    nio.ByteBuffer out = nio.ByteBuffer.allocate(4);
    encodeInt(out, frame.remaining());
    out.position = 0;
    transport_.write(out.array(), out.position, out.remaining());
    transport_.write(frame.array(), frame.position, frame.remaining());
  } else if (clientType == ClientTypes.HTTP) {
    throw TTransportError(null, "HTTP is not supported for THeaderTransport");
  } else {
    throw TTransportError(null, "Unknown client type");
  }

  await transport_.flush(oneway);
}