flush method
Flush any pending data out of a transport buffer. Throws TTransportError if there was an error writing out data.
Implementation
@override
Future flush([bool oneway = false]) async {
try {
TApplicationError? tae;
Int8List buf = writeBuffer_.get();
// writeBuffer_.clear();
int len = writeBuffer_.len();
if (len >= 2 &&
buf[0] == TCompactProtocol.PROTOCOL_ID &&
((buf[1] >> TCompactProtocol.TYPE_SHIFT_AMOUNT) & 0x03) ==
TMessageType.EXCEPTION) {
TCompactProtocol proto = TCompactProtocol(TMemoryInputTransport(buf));
// ignore: unused_local_variable
TMessage msg = proto.readMessageBegin();
tae = TApplicationError.read(proto);
} else if (len >= 4 &&
((buf[0] << 24) | (buf[1] << 16)) == TBinaryProtocol.VERSION_1 &&
buf[3] == TMessageType.EXCEPTION) {
TBinaryProtocol proto = TBinaryProtocol(TMemoryInputTransport(buf));
// ignore: unused_local_variable
TMessage msg = proto.readMessageBegin();
tae = TApplicationError.read(proto);
}
if (tae != null) {
if (!_writeHeaders.containsKey('uex')) {
_writeHeaders['uex'] = "TApplicationError";
}
if (!_writeHeaders.containsKey('uexw')) {
_writeHeaders['uexw'] = tae.message ?? '[null]';
}
}
} catch (e) {
// ignore
}
nio.ByteBuffer frame = nio.ByteBuffer.wrap(writeBuffer_.get());
frame.limit = writeBuffer_.len();
writeBuffer_.reset();
if (clientType == ClientTypes.HEADERS) {
frame = transform(frame);
}
if (frame.remaining() > MAX_FRAME_SIZE) {
throw TTransportError(null, "Frame size exceeds max length");
}
if (_protoId == T_JSON_PROTOCOL && clientType != ClientTypes.HTTP) {
throw TTransportError(null, "Trying to send JSON encoding over binary");
}
if (clientType == ClientTypes.HEADERS) {
nio.ByteBuffer transformData =
nio.ByteBuffer.allocate(_writeTransforms.length * 5);
int numTransforms = _writeTransforms.length;
for (Transforms trans in _writeTransforms) {
writeVarint(transformData, trans.value);
}
transformData.limit = transformData.position;
transformData.position = 0;
if (_identity != null && _identity!.isNotEmpty) {
_writeHeaders[ID_VERSION_HEADER] = ID_VERSION;
_writeHeaders[IDENTITY_HEADER] = _identity!;
}
nio.ByteBuffer infoData1 =
flushInfoHeaders(Infos.INFO_PKEYVALUE, _writePersistentHeaders);
nio.ByteBuffer infoData2 =
flushInfoHeaders(Infos.INFO_KEYVALUE, _writeHeaders);
nio.ByteBuffer headerData = nio.ByteBuffer.allocate(10);
writeVarint(headerData, _protoId);
writeVarint(headerData, numTransforms);
headerData.limit = headerData.position;
headerData.position = 0;
int headerSize = transformData.remaining() +
infoData1.remaining() +
infoData2.remaining() +
headerData.remaining();
int paddingSize = 4 - headerSize % 4;
headerSize += paddingSize;
nio.ByteBuffer out = nio.ByteBuffer.allocate(headerSize + 14);
encodeInt(out, 10 + headerSize + frame.remaining());
encodeShort(out, HEADER_MAGIC >> 16);
encodeShort(out, _flags);
encodeInt(out, _seqId);
encodeShort(out, (headerSize / 4).round());
out.putSrc(headerData);
out.putSrc(transformData);
out.putSrc(infoData1);
out.putSrc(infoData2);
for (int i = 0; i < paddingSize; i++) {
out.put(b: 0x00);
}
out.position = 0;
transport_.write(out.array(), out.position, out.remaining());
transport_.write(frame.array(), frame.position, frame.remaining());
} else if (clientType == ClientTypes.FRAMED_DEPRECATED) {
nio.ByteBuffer out = nio.ByteBuffer.allocate(4);
encodeInt(out, frame.remaining());
out.position = 0;
transport_.write(out.array(), out.position, out.remaining());
transport_.write(frame.array(), frame.position, frame.remaining());
} else if (clientType == ClientTypes.HTTP) {
throw TTransportError(null, "HTTP is not supported for THeaderTransport");
} else {
throw TTransportError(null, "Unknown client type");
}
await transport_.flush(oneway);
}