connectEventSource method
GET /:/eventsource/notifications — open a Server-Sent Events
stream of notifications. SSE is a one-way (server → client)
transport; use this when WebSockets are blocked.
filter (optional) restricts to one notification type.
Implementation
Stream<PlexNotification> connectEventSource({String? filter}) {
if (_sseController != null) {
throw const PlexException(
'EventSource already connected. Call closeEventSource() first.',
type: PlexErrorType.state,
);
}
final base = _http.baseUrl;
final token = _http.token;
if (base == null || token == null) {
throw const PlexException(
'No PMS connection or token. Call connect() / setToken() first.',
type: PlexErrorType.state,
);
}
final qp = <String, String>{'X-Plex-Token': token};
if (filter != null) qp['filter'] = filter;
final controller = StreamController<PlexNotification>.broadcast(
onCancel: () {
if (!(_sseController?.hasListener ?? false)) {
closeEventSource();
}
},
);
_sseController = controller;
// Use the connection's streaming GET to consume the SSE body.
() async {
try {
final res = await _http.streamGet(
'/:/eventsource/notifications',
queryParameters: qp,
extraHeaders: const {'Accept': 'text/event-stream'},
);
final body = res.data;
if (body == null) {
controller.close();
_cleanupSse();
return;
}
final buffer = StringBuffer();
_sseSub = body.stream.listen(
(chunk) {
buffer.write(utf8.decode(chunk, allowMalformed: true));
// SSE frames are separated by a blank line.
while (true) {
final raw = buffer.toString();
final idx = raw.indexOf('\n\n');
if (idx < 0) break;
final frame = raw.substring(0, idx);
buffer
..clear()
..write(raw.substring(idx + 2));
final payload = _extractSseData(frame);
if (payload == null) continue;
final notif = _decode(payload);
if (notif != null) controller.add(notif);
}
},
onError: (Object error, StackTrace stackTrace) {
controller.addError(
PlexException(
'EventSource error: $error',
type: PlexErrorType.connection,
),
stackTrace,
);
},
onDone: () {
controller.close();
_cleanupSse();
},
cancelOnError: false,
);
} catch (e, stackTrace) {
controller.addError(
PlexException(
'EventSource open failed: $e',
type: PlexErrorType.connection,
),
stackTrace,
);
controller.close();
_cleanupSse();
}
}();
return controller.stream;
}