callStream method

Future<Stream<Map<String, dynamic>>?> callStream(
  1. String verb,
  2. String path, {
  3. Map<String, dynamic>? params,
  4. Object? data,
})
inherited

callStream acts exactly like call, but it responds with a stream of Json Objects

Implementation

Future<Stream<Map<String, dynamic>>?> callStream(String verb, String path,
    {Map<String, dynamic>? params, Object? data}) async {
  var uri = _makeUri(path, params: params);

  // setup a manual request to manage streaming
  var client = http.Client();
  var verb = data == null ? 'GET' : 'POST';
  var r = http.Request(verb, uri);
  r.headers['content-type'] = 'application/json';
  if (data is String) {
    r.body = data;
  } else if (data != null) {
    r.body = json.encode(data);
  }

  var res = await client.send(r);

  // if successful, create a stream of Json Objects
  if (res.statusCode > 199 && res.statusCode < 300) {
    /// propresenter might close this connection prematurely.
    /// we want to catch it.
    try {
      var sc = StreamController<Map<String, dynamic>>();
      var accum = '';
      var bodyListener = res.stream.listen((e) {
        accum += utf8.decode(e);
        var chunks = accum.split('\r\n\r\n');
        // if the received data ended with \r\n\r\n, the last chunk will be empty
        // if it didn't end with \r\n\r\n, then we want to leave it in the accumulator
        accum = chunks.removeLast();
        for (var chunk in chunks) {
          try {
            var decoded = json.decode(chunk);
            // print(decoded);
            sc.add({...decoded});
          } catch (e) {
            // print('JSON ERROR: $e');
          }
        }
      });

      // cleanup stream when the server has stopped sending data
      bodyListener.onDone(() {
        sc.isClosed ? null : sc.close();
      });

      // close http connection when the listener to the stream cancels
      sc.onCancel = () {
        bodyListener.cancel();
        client.close();
      };
      return sc.stream;
    } on http.ClientException catch (e) {
      debug(e);
      return null;
    }
  } else {
    // we had an error of some kind, but we used a streaming request
    // so we wait until all the response data has arrived before throwing
    // the error.
    var err = await _awaitBody(res.stream).timeout(Duration(seconds: 2), onTimeout: () => '"stream timeout"');
    if (err != 'stream timeout' && res.headers['content-type'] == 'application/json') {
      throw http.ClientException(json.decode(err));
    }
    throw http.ClientException(err);
  }
  // return null;
}