streamSSE function

Future<Response> streamSSE(
  1. Context c,
  2. Future<void> callback(
    1. DartoSSEWriter writer
    ), {
  3. Future<void> onError(
    1. Object error,
    2. DartoSSEWriter writer
    )?,
})

Streams Server-Sent Events (SSE) to the client.

Sets Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive, and X-Accel-Buffering: no (disables Nginx proxy buffering) automatically. The callback receives a DartoSSEWriter.

app.get('/events', (c) => streamSSE(c, (sse) async {
  sse.onAbort(() => print('client disconnected'));
  var i = 0;
  while (!sse.isAborted) {
    await sse.writeSSE(SseEvent(event: 'tick', data: '${i++}'));
    await sse.sleep(Duration(seconds: 1));
  }
}));

Implementation

Future<Response> streamSSE(
  Context c,
  Future<void> Function(DartoSSEWriter writer) callback, {
  Future<void> Function(Object error, DartoSSEWriter writer)? onError,
}) async {
  final httpRes = c.res.raw;
  httpRes.statusCode = 200;
  httpRes.headers.contentType = ContentType('text', 'event-stream');
  httpRes.headers.set('Cache-Control', 'no-cache');
  httpRes.headers.set('Connection', 'keep-alive');
  httpRes.headers.set('X-Accel-Buffering', 'no');

  final writer = DartoSSEWriter._(httpRes);
  unawaited(httpRes.done.then((_) {}, onError: (_) {
    writer._aborted = true;
    writer._onAbortCallback?.call();
  }));

  try {
    await callback(writer);
  } catch (e) {
    if (onError != null) await onError(e, writer);
  } finally {
    try {
      await httpRes.close();
    } catch (_) {}
  }

  return const Response.sent();
}