request<T> method

Future<Message<T>> request<T>(
  1. String subj,
  2. Uint8List data, {
  3. Duration timeout = const Duration(seconds: 2),
  4. T jsonDecoder(
    1. String
    )?,
})

Request will send a request payload and deliver the response message, TimeoutException on timeout.

Example:

try {
  await client.request('service', Uint8List.fromList('request'.codeUnits),
      timeout: Duration(seconds: 2));
} on TimeoutException {
  timeout = true;
}

Implementation

Future<Message<T>> request<T>(
  String subj,
  Uint8List data, {
  Duration timeout = const Duration(seconds: 2),
  T Function(String)? jsonDecoder,
}) async {
  if (!connected) {
    throw NatsException("request error: client not connected");
  }
  Message resp;
  //ensure no other request
  await _mutex.acquire();
  //get registered json decoder
  if (T != dynamic && jsonDecoder == null) {
    jsonDecoder = _getJsonDecoder();
  }

  if (_inboxSubPrefix == null) {
    if (inboxPrefix == '_INBOX') {
      _inboxSubPrefix = inboxPrefix + '.' + Nuid().next();
    } else {
      _inboxSubPrefix = inboxPrefix;
    }
    _inboxSub = sub<T>(_inboxSubPrefix! + '.>', jsonDecoder: jsonDecoder);
  }
  var inbox = _inboxSubPrefix! + '.' + Nuid().next();
  var stream = _inboxSub!.stream;

  pub(subj, data, replyTo: inbox);

  try {
    do {
      resp = await stream.take(1).single.timeout(timeout);
    } while (resp.subject != inbox);
  } on TimeoutException {
    throw TimeoutException('request time > $timeout');
  } finally {
    _mutex.release();
  }
  var msg = Message<T>(resp.subject, resp.sid, resp.byte, this,
      jsonDecoder: jsonDecoder);
  return msg;
}