send method

  1. @override
Future<IOStreamedResponse> send(
  1. BaseRequest request
)
override

Sends an HTTP request and asynchronously returns the response.

Implementation

@override
Future<IOStreamedResponse> send(BaseRequest request) async {
  if (_inner == null) {
    throw ClientException(
        'HTTP request failed. Client is already closed.', request.url);
  }

  var stream = request.finalize();

  try {
    var ioRequest = (await _inner!.openUrl(request.method, request.url))
      ..followRedirects = request.followRedirects
      ..maxRedirects = request.maxRedirects
      ..contentLength = (request.contentLength ?? -1)
      ..persistentConnection = request.persistentConnection;
    request.headers.forEach((name, value) {
      ioRequest.headers.set(name, value);
    });

    // SDK request aborting is only effective up until the request is closed,
    // at which point the full response always becomes available.
    // This occurs at `pipe`, which automatically closes the request once the
    // request stream has been pumped in.
    //
    // Therefore, we have multiple strategies:
    //  * If the user aborts before we have a response, we can use SDK abort,
    //    which causes the `pipe` (and therefore this method) to throw the
    //    aborted error
    //  * If the user aborts after we have a response but before they listen
    //    to it, we immediately emit the aborted error then close the response
    //    as soon as they listen to it
    //  * If the user aborts whilst streaming the response, we inject the
    //    aborted error, then close the response

    var isAborted = false;
    var hasResponse = false;

    if (request case Abortable(:final abortTrigger?)) {
      unawaited(
        abortTrigger.whenComplete(() {
          isAborted = true;
          if (!hasResponse) {
            ioRequest.abort(RequestAbortedException(request.url));
          }
        }),
      );
    }

    final response = await stream.pipe(ioRequest) as HttpClientResponse;
    hasResponse = true;

    StreamSubscription<List<int>>? ioResponseSubscription;

    late final StreamController<List<int>> responseController;
    responseController = StreamController(
      onListen: () {
        if (isAborted) {
          responseController
            ..addError(RequestAbortedException(request.url))
            ..close();
          return;
        } else if (request case Abortable(:final abortTrigger?)) {
          abortTrigger.whenComplete(() {
            if (!responseController.isClosed) {
              responseController
                ..addError(RequestAbortedException(request.url))
                ..close();
            }
            ioResponseSubscription?.cancel();
          });
        }

        ioResponseSubscription = response.listen(
          responseController.add,
          onDone: () {
            // `reponseController.close` will trigger the `onCancel` callback.
            // Assign `ioResponseSubscription` to `null` to avoid calling its
            // `cancel` method.
            ioResponseSubscription = null;
            unawaited(responseController.close());
          },
          onError: (Object err, StackTrace stackTrace) {
            if (err is HttpException) {
              responseController.addError(
                ClientException(err.message, err.uri),
                stackTrace,
              );
            } else {
              responseController.addError(err, stackTrace);
            }
          },
        );
      },
      onPause: () => ioResponseSubscription?.pause(),
      onResume: () => ioResponseSubscription?.resume(),
      onCancel: () => ioResponseSubscription?.cancel(),
      sync: true,
    );

    var headers = <String, String>{};
    response.headers.forEach((key, values) {
      // TODO: Remove trimRight() when
      // https://github.com/dart-lang/sdk/issues/53005 is resolved and the
      // package:http SDK constraint requires that version or later.
      headers[key] = values.map((value) => value.trimRight()).join(',');
    });

    return _IOStreamedResponseV2(
      responseController.stream,
      response.statusCode,
      contentLength:
          response.contentLength == -1 ? null : response.contentLength,
      request: request,
      headers: headers,
      isRedirect: response.isRedirect,
      url: response.redirects.isNotEmpty
          ? response.redirects.last.location
          : request.url,
      persistentConnection: response.persistentConnection,
      reasonPhrase: response.reasonPhrase,
      inner: response,
    );
  } on SocketException catch (error) {
    throw _ClientSocketException(error, request.url);
  } on HttpException catch (error) {
    throw ClientException(error.message, error.uri);
  }
}