stream method

Allows to stream SSE events from horizon. Certain endpoints in Horizon can be called in streaming mode using Server-Sent Events. This mode will keep the connection to horizon open and horizon will continue to return responses as ledgers close. See: Streaming

Implementation

Stream<OperationResponse> stream() {
  StreamController<OperationResponse> listener = StreamController.broadcast();

  bool cancelled = false;
  EventSource? source;

  Future<void> createNewEventSource() async {
    if (cancelled) {
      return;
    }
    source?.close();
    source = await EventSource.connect(this.buildUri());
    source!.listen((Event event) async {
      if (cancelled) {
        return null;
      }
      if (event.event == "open") {
        return null;
      }
      if (event.event == "close") {
        // Reconnect on close to stream infinitely
        createNewEventSource();
        return null;
      }
      try {
        OperationResponse operationResponse = OperationResponse.fromJson(
          json.decode(event.data!),
        );
        listener.add(operationResponse);
      } catch (e, stackTrace) {
        listener.addError(e, stackTrace);
        createNewEventSource();
      }
    });
  }

  listener.onListen = () {
    cancelled = false;
    createNewEventSource();
  };
  listener.onCancel = () {
    if (!listener.hasListener) {
      cancelled = true;
      source?.close();
    }
  };

  return listener.stream;
}