request<T> method
Request will send a request payload and deliver the response message, TimeoutException on timeout.
Example:
try {
await client.request('service', Uint8List.fromList('request'.codeUnits),
timeout: Duration(seconds: 2));
} on TimeoutException {
timeout = true;
}
Implementation
Future<Message<T>> request<T>(
String subj,
Uint8List data, {
Duration timeout = const Duration(seconds: 2),
T Function(String)? jsonDecoder,
}) async {
if (!connected) {
throw NatsException("request error: client not connected");
}
Message resp;
//ensure no other request
await _mutex.acquire();
//get registered json decoder
if (T != dynamic && jsonDecoder == null) {
jsonDecoder = _getJsonDecoder();
}
if (_inboxSubPrefix == null) {
if (inboxPrefix == '_INBOX') {
_inboxSubPrefix = inboxPrefix + '.' + Nuid().next();
} else {
_inboxSubPrefix = inboxPrefix;
}
_inboxSub = sub<T>(_inboxSubPrefix! + '.>', jsonDecoder: jsonDecoder);
}
var inbox = _inboxSubPrefix! + '.' + Nuid().next();
var stream = _inboxSub!.stream;
pub(subj, data, replyTo: inbox);
try {
do {
resp = await stream.take(1).single.timeout(timeout);
} while (resp.subject != inbox);
} on TimeoutException {
throw TimeoutException('request time > $timeout');
} finally {
_mutex.release();
}
var msg = Message<T>(resp.subject, resp.sid, resp.byte, this,
jsonDecoder: jsonDecoder);
return msg;
}