putStream method

Stream<UploadProgress> putStream({
  1. required Uri url,
  2. required Stream<List<int>> body,
  3. required Map<String, String> headers,
  4. int? contentLength,
})

Upload data using streaming to avoid loading entire file into memory Returns a stream of UploadProgress that emits progress updates and completes when upload is done Uses BehaviorSubject so new listeners get the latest progress immediately

Implementation

Stream<UploadProgress> putStream({
  required Uri url,
  required Stream<List<int>> body,
  required Map<String, String> headers,
  int? contentLength,
}) {
  final progressSubject = BehaviorSubject<UploadProgress>.seeded(
    UploadProgress(sentBytes: 0, totalBytes: contentLength ?? 0),
  );

  () async {
    try {
      final request = http.StreamedRequest('PUT', url);

      // Add headers
      request.headers.addAll(headers);

      // Set content length if provided (required by some servers)
      if (contentLength != null) {
        request.contentLength = contentLength;
      }

      // Track progress
      int bytesSent = 0;
      final totalBytes = contentLength ?? 0;

      final progressStream = body.map((chunk) {
        bytesSent += chunk.length;
        progressSubject.add(UploadProgress(
          sentBytes: bytesSent,
          totalBytes: totalBytes,
        ));
        return chunk;
      });

      // Pipe the stream to the request
      progressStream.listen(
        request.sink.add,
        onError: (error) {
          request.sink.addError(error);
          progressSubject.addError(error);
        },
        onDone: request.sink.close,
        cancelOnError: true,
      );

      // Send the request and get response
      final streamedResponse = await _client.send(request);
      final response = await http.Response.fromStream(streamedResponse);

      if (response.statusCode != 200) {
        final error = Exception(
            "error fetching STATUS: ${response.statusCode}, Link: $url");
        progressSubject.addError(error);
        await progressSubject.close();
        return;
      }

      // Upload complete
      progressSubject.add(UploadProgress(
        sentBytes: totalBytes,
        totalBytes: totalBytes,
        isComplete: true,
        response: response,
      ));
      await progressSubject.close();
    } catch (error) {
      progressSubject.addError(error);
      await progressSubject.close();
    }
  }();

  return progressSubject.stream;
}