send method
Sends an HTTP request and asynchronously returns the response.
Implementation
@override
Future<IOStreamedResponse> send(BaseRequest request) async {
if (_inner == null) {
throw ClientException(
'HTTP request failed. Client is already closed.', request.url);
}
var stream = request.finalize();
try {
var ioRequest = (await _inner!.openUrl(request.method, request.url))
..followRedirects = request.followRedirects
..maxRedirects = request.maxRedirects
..contentLength = (request.contentLength ?? -1)
..persistentConnection = request.persistentConnection;
request.headers.forEach((name, value) {
ioRequest.headers.set(name, value);
});
// SDK request aborting is only effective up until the request is closed,
// at which point the full response always becomes available.
// This occurs at `pipe`, which automatically closes the request once the
// request stream has been pumped in.
//
// Therefore, we have multiple strategies:
// * If the user aborts before we have a response, we can use SDK abort,
// which causes the `pipe` (and therefore this method) to throw the
// aborted error
// * If the user aborts after we have a response but before they listen
// to it, we immediately emit the aborted error then close the response
// as soon as they listen to it
// * If the user aborts whilst streaming the response, we inject the
// aborted error, then close the response
var isAborted = false;
var hasResponse = false;
if (request case Abortable(:final abortTrigger?)) {
unawaited(
abortTrigger.whenComplete(() {
isAborted = true;
if (!hasResponse) {
ioRequest.abort(RequestAbortedException(request.url));
}
}),
);
}
final response = await stream.pipe(ioRequest) as HttpClientResponse;
hasResponse = true;
StreamSubscription<List<int>>? ioResponseSubscription;
late final StreamController<List<int>> responseController;
responseController = StreamController(
onListen: () {
if (isAborted) {
responseController
..addError(RequestAbortedException(request.url))
..close();
return;
} else if (request case Abortable(:final abortTrigger?)) {
abortTrigger.whenComplete(() {
if (!responseController.isClosed) {
responseController
..addError(RequestAbortedException(request.url))
..close();
}
ioResponseSubscription?.cancel();
});
}
ioResponseSubscription = response.listen(
responseController.add,
onDone: () {
// `reponseController.close` will trigger the `onCancel` callback.
// Assign `ioResponseSubscription` to `null` to avoid calling its
// `cancel` method.
ioResponseSubscription = null;
unawaited(responseController.close());
},
onError: (Object err, StackTrace stackTrace) {
if (err is HttpException) {
responseController.addError(
ClientException(err.message, err.uri),
stackTrace,
);
} else {
responseController.addError(err, stackTrace);
}
},
);
},
onPause: () => ioResponseSubscription?.pause(),
onResume: () => ioResponseSubscription?.resume(),
onCancel: () => ioResponseSubscription?.cancel(),
sync: true,
);
var headers = <String, String>{};
response.headers.forEach((key, values) {
// TODO: Remove trimRight() when
// https://github.com/dart-lang/sdk/issues/53005 is resolved and the
// package:http SDK constraint requires that version or later.
headers[key] = values.map((value) => value.trimRight()).join(',');
});
return _IOStreamedResponseV2(
responseController.stream,
response.statusCode,
contentLength:
response.contentLength == -1 ? null : response.contentLength,
request: request,
headers: headers,
isRedirect: response.isRedirect,
url: response.redirects.isNotEmpty
? response.redirects.last.location
: request.url,
persistentConnection: response.persistentConnection,
reasonPhrase: response.reasonPhrase,
inner: response,
);
} on SocketException catch (error) {
throw _ClientSocketException(error, request.url);
} on HttpException catch (error) {
throw ClientException(error.message, error.uri);
}
}