sse_request 0.1.2 copy "sse_request: ^0.1.2" to clipboard
sse_request: ^0.1.2 copied to clipboard

A dart package for consuming SSE (Server Sent Events) using configurable and extensible connection lifecycle and event handling controller

SSE_REQUEST #

Simple SSE (Server-Sent Events) consumer. Built around dart:http Request.

Features #

  • Consumes SSE and converts each event into a Map<String, dynamic>.
  • Configurable and extensible.
  • Can be configured to auto-reconnect or perform any action you want on caught exceptions.
  • Can be configured to parse each event with ease.

Getting started #

  • Add package to pubspec.yaml:
dependencies:
  sse_request: ^0.1.1
  • Import package:
import 'package:sse_request/sse_request.dart';
  • Create an SseRequest() object and call .getStream().
  • Add a listener to the Stream.

Usage #

import 'dart:developer' as dev;
import 'package:sse_request/sse_request.dart';

void main() async {
  /// Create an [SseRequest] for a GET request.
  final getRequest = SseRequest.get(
    uri: Uri.parse('your_uri'),
    headers: {'hello': 'world'},
  );

  /// Create an [SseRequest] for a POST request.
  final postRequest = SseRequest.post(
    uri: Uri.parse('your_uri'),
    headers: {'hello': 'world'},
    body: {'hello': 'world'},
  );


  /// Obtains a [Stream] of events.
  ///
  /// First parameter is the subscription name, which is used
  /// to distinguish connection events for multiple streams.
  ///
  /// Nothing is send until the first listener is attached.
  final stream = getRequest.getStream('name:1');

  /// Listens to the parsed SSE event stream.
  final subscription = stream.listen(
    (event) {
      dev.log(event.toString());
    },
    onError: (e) {
      dev.log('Invalid SSE message: $e');
    },
  );

  // Demonstration delay.
  await Future.delayed(Duration(seconds: 10));

  /// Don't forget to close the [StreamSubscription] to
  /// avoid memory leaks.
  subscription.cancel();
}

For advanced functionality prefer using SseSourceController or SseParsedSourceController<T>, or even implement SseSourceControllerBase.


Going further #

Usage of SseSourceController

import 'dart:async';
import 'dart:developer' as dev;
import 'package:sse_request/sse_request.dart';

Future<void> main() async {
  // Create an SSE GET request.
  final request = SseRequest.get(
    uri: Uri.parse('your_api_uri'),
  );

  final controller = SseSourceController(
    // This name used to distinguish connection events for multiple streams.
    name: 'Name:1',
    // Specify the builder function for obtaining the event stream.
    sseStreamBuilder: request.sendStreamed,
    // Invoked when a new SSE connection is inbound.
    onNewConnection: (name) => dev.log('Creating new SSE connection to "$name"'),
    // Invoked when the SSE connection is established.
    onConnected: (name) => dev.log('Established SSE connection to "$name"'),
    // Invoked when the SSE connection is closed.
    onCloseConnection: (name, wasConnected) {
      if (wasConnected) {
        dev.log('Closed SSE subscription $name');
      } else {
        dev.log('Closed SSE subscription $name without being opened');
      }
    },
    // Invoked when the stream is cancelled.
    onCancel: (name, wasConnected) {
      if (wasConnected) {
        dev.log('Canceled SSE subscription $name');
      } else {
        dev.log('Canceled SSE subscription $name without being opened');
      }
    },
  );

  // Establish an SSE connection.
  // Nothing is sent until this happens.
  final subscription = controller.stream.listen((event) {
    dev.log(event.toString());
  }, onError: (e) {
    dev.log(e.toString());
  });

  // Demonstration delay.
  await Future.delayed(Duration(seconds: 10));

  /// Don't forget to close the StreamSubscription
  /// to avoid memory leaks.
  subscription.cancel();

  // `dispose()` or `clear()` methods can be used to force close
  // connection  using controller, where `dispose()` ensures
  // you cannot use the controller again.
  controller.dispose();
}

Add reconnection

SseSourceController's stream is separate from the SSE stream, so if connection to the server is lost, a new SSE stream can be attached using connectEventListener(). This can be handled by actionOnErrorEvent;

import 'dart:async';
import 'dart:developer' as dev;
import 'package:http/http.dart';
import 'package:sse_request/sse_request.dart';

Future<void> main() async {
  // Create a new SSE stream request for each connection attempt,
  // as a single request instance cannot be reused.
  sseStreamBuilder(Client client) {
    return SseRequest.get(
      uri: Uri.parse('your_api_uri'),
    ).sendStreamed(client);
  }

  final controller = SseSourceController(
    name: 'Name:1',
    sseStreamBuilder: sseStreamBuilder,
    // `actionOnErrorEvent` invoked on every error event with
    // controller instance and error itself.
    actionOnErrorEvent: (controller, error, st) async {
      // Implementation of reconnection logic.
      //
      // With the callback of `SseSourceController controller`
      // you can handle how the controller reacts to certain errors.
      try {
        // Cancel current event listener and close http client.
        await controller.clear();

        // Connect new event listener.
        await controller.connectEventListener(sseStreamBuilder);
      } catch (e) {
        // On failed connection retry, you can dispose the controller
        // or retry again after a delay.
        await controller.dispose();
        rethrow;
      }
    },
  );
}

Parse each event

Use SseParsedSourceController<T> instead of SseSourceController to set a custom type for the stream and define eventParser;

SseParsedSourceController<String>(
  name: 'Name:1',
  sseStreamBuilder: request.sendStreamed,
  // Invoked on every new event. Expects to return a value of specified type
  // or throw an error, which will call [onErrorEvent], so you don't need to
  // duplicate error handling logic.
  eventParser: (Map<String, dynamic> event) {
    // Implement your parser here.
    try {
      return event.values.first;
    } catch (e) {
      // On unhandled exeption will call [onErrorEvent].
      rethrow;
    }
  },
);

Some explanation

The package consists of two parts:

  • Data stream converters.
  • Data stream controller SseSourceController. Manages connection lifecycle and event handling.

To connect to a data stream, package use BaseRequest's send() from the dart:http library, which returns a StreamedResponse. Instead of waiting for the entire response, you can access streamedResponse.stream to receive data as it arrives. This stream provides raw bytes (ByteData), which should be decoded and parsed into usable events.

Example of creating a custom StreamedResponse converter:

import 'package:http/http.dart' as http;
import 'package:sse_request/sse_transformers.dart';

Future<Stream> getStream(http.BaseRequest request) async {
  http.StreamedResponse response = await request.send();

  http.ByteStream byteStream = response.stream;

  Stream<Map<String, dynamic>> convertedSseStream = byteStream
      .transform(encoding.decoder)
      .transform(sseStreamSplitter)
      .transform(sseStreamParser);

  return convertedSseStream;
}

SseRequest is just a wrapper around http.Request, so you can use any method to obtain a data stream and pass it to SseSourceController. This is exactly what SseRequest does in its getStream() method.

Additional info #

  • This package does not support bidirectional protocol implementation. In that case, consider using the official sse package instead.

Known issues:

  • Too many data in a single stream response or a short amount of time could be cut off
1
likes
150
points
24
downloads

Documentation

API reference

Publisher

unverified uploader

Weekly Downloads

A dart package for consuming SSE (Server Sent Events) using configurable and extensible connection lifecycle and event handling controller

Repository (GitHub)
View/report issues

License

BSD-3-Clause (license)

Dependencies

http, meta

More

Packages that depend on sse_request