connectEventSource method

Stream<PlexNotification> connectEventSource({
  1. String? filter,
})

GET /:/eventsource/notifications — open a Server-Sent Events stream of notifications. SSE is a one-way (server → client) transport; use this when WebSockets are blocked.

filter (optional) restricts to one notification type.

Implementation

Stream<PlexNotification> connectEventSource({String? filter}) {
  if (_sseController != null) {
    throw const PlexException(
      'EventSource already connected. Call closeEventSource() first.',
      type: PlexErrorType.state,
    );
  }
  final base = _http.baseUrl;
  final token = _http.token;
  if (base == null || token == null) {
    throw const PlexException(
      'No PMS connection or token. Call connect() / setToken() first.',
      type: PlexErrorType.state,
    );
  }
  final qp = <String, String>{'X-Plex-Token': token};
  if (filter != null) qp['filter'] = filter;

  final controller = StreamController<PlexNotification>.broadcast(
    onCancel: () {
      if (!(_sseController?.hasListener ?? false)) {
        closeEventSource();
      }
    },
  );
  _sseController = controller;

  // Use the connection's streaming GET to consume the SSE body.
  () async {
    try {
      final res = await _http.streamGet(
        '/:/eventsource/notifications',
        queryParameters: qp,
        extraHeaders: const {'Accept': 'text/event-stream'},
      );
      final body = res.data;
      if (body == null) {
        controller.close();
        _cleanupSse();
        return;
      }
      final buffer = StringBuffer();
      _sseSub = body.stream.listen(
        (chunk) {
          buffer.write(utf8.decode(chunk, allowMalformed: true));
          // SSE frames are separated by a blank line.
          while (true) {
            final raw = buffer.toString();
            final idx = raw.indexOf('\n\n');
            if (idx < 0) break;
            final frame = raw.substring(0, idx);
            buffer
              ..clear()
              ..write(raw.substring(idx + 2));
            final payload = _extractSseData(frame);
            if (payload == null) continue;
            final notif = _decode(payload);
            if (notif != null) controller.add(notif);
          }
        },
        onError: (Object error, StackTrace stackTrace) {
          controller.addError(
            PlexException(
              'EventSource error: $error',
              type: PlexErrorType.connection,
            ),
            stackTrace,
          );
        },
        onDone: () {
          controller.close();
          _cleanupSse();
        },
        cancelOnError: false,
      );
    } catch (e, stackTrace) {
      controller.addError(
        PlexException(
          'EventSource open failed: $e',
          type: PlexErrorType.connection,
        ),
        stackTrace,
      );
      controller.close();
      _cleanupSse();
    }
  }();

  return controller.stream;
}