
EventFlux
A Dart package for Server-Sent Events done right — WHATWG spec-compliant parsing, auto-reconnect with exponential backoff, request/response interceptors, mid-flight abort, idle timeout detection, and web platform support out of the box.
Platform Support
| Android | iOS | Web | MacOS | Windows | Linux |
|---|---|---|---|---|---|
| ✅ | ✅ | ✅ | ✅ | ❓ | ❓ |
Windows and Linux should work but haven't been battle-tested yet — PRs welcome if you get there first.
Features 🌟
- 📜 WHATWG SSE spec-compliant event stream parsing with persistent
lastEventIdandretry:field support - 🔄 Auto-reconnect with linear or exponential backoff, random jitter, and configurable
maxBackoffcap - 🔗 Interceptor chain — hook into request, response, and error lifecycle stages
- 🛑 Mid-flight abort via a
Future<void>trigger - ⏱️ Idle timeout detection — drops the connection if no data arrives within a configured duration
- 🌐 Web platform support with CORS, credentials, and caching configuration via
WebConfig - 🔍 Event filtering by type using
response.where() - 📎 Multipart request support
- 🏗️ Singleton (
EventFlux.instance) and multiple independent connections (EventFlux.spawn()) - 🔌 Pluggable HTTP clients via
HttpClientAdapter - 🧠 Smart error classification — only 5xx, 408, and 429 trigger auto-reconnect
Migrating from v2? FYI. 🔄
Here
If you're upgrading from v2, here's what changed:
Breaking
onReconnectcallback signature changed from()to(int attempt, Duration delay)- Default request headers now include
Cache-Control: no-store - Minimum Dart SDK raised to
>=3.4.0, Flutter>=3.0.0 webConfigis now required when running on web
New in v3
EventFluxStatus.reconnectingstatus valueoriginalErrorandstackTracefields onEventFluxException- Request/response/error interceptor chain via
interceptorsparameter - Mid-flight abort support via
abortTriggerparameter - Idle timeout detection via
connectionTimeoutonReconnectConfig maxBackoffonReconnectConfigto cap exponential backoff- WHATWG-compliant SSE parser with persistent
lastEventId,retry:field support, and U+2028 sanitization
Installation 📦
dependencies:
eventflux: ^3.0.2-dev
Requires Dart SDK >=3.4.0 and Flutter >=3.0.0.
Usage 🔧
Basic Connection — Connect to an SSE endpoint in a few lines
import 'package:eventflux/eventflux.dart';
void main() {
EventFlux.instance.connect(
EventFluxConnectionType.get,
'https://example.com/events',
onSuccessCallback: (EventFluxResponse? response) {
response?.stream?.listen((EventFluxData data) {
print('Event: ${data.event}');
print('Data: ${data.data}');
});
},
onError: (EventFluxException error) {
print('Error: $error');
},
onConnectionClose: () {
print('Connection closed');
},
);
}
Auto-Reconnect — Exponential backoff with jitter and token refresh
import 'package:eventflux/eventflux.dart';
void main() {
EventFlux.instance.connect(
EventFluxConnectionType.get,
'https://example.com/events',
onSuccessCallback: (EventFluxResponse? response) {
response?.stream?.listen((data) {
print('Data: ${data.data}');
});
},
onError: (error) {
print('Error: $error');
},
autoReconnect: true,
reconnectConfig: ReconnectConfig(
mode: ReconnectMode.exponential,
interval: Duration(seconds: 2),
maxAttempts: 10,
maxBackoff: Duration(seconds: 30),
connectionTimeout: Duration(seconds: 60),
onReconnect: (int attempt, Duration delay) {
print('Reconnect attempt $attempt after $delay');
},
reconnectHeader: () async {
String newToken = await refreshAccessToken();
return {
'Authorization': 'Bearer $newToken',
'Accept': 'text/event-stream',
};
},
),
);
}
Interceptors — Inject auth headers, log responses, suppress errors
import 'package:eventflux/eventflux.dart';
import 'package:http/http.dart';
class AuthInterceptor extends EventFluxInterceptor {
@override
Future<BaseRequest> onRequest(BaseRequest request) async {
request.headers['Authorization'] = 'Bearer my-token';
return request;
}
@override
Future<StreamedResponse> onResponse(StreamedResponse response) async {
print('Response status: ${response.statusCode}');
return response;
}
@override
Future<EventFluxException?> onError(EventFluxException exception) async {
print('Intercepted error: $exception');
// Return null to suppress the error, or return exception to propagate it
return exception;
}
}
void main() {
EventFlux.instance.connect(
EventFluxConnectionType.get,
'https://example.com/events',
interceptors: [AuthInterceptor()],
onSuccessCallback: (EventFluxResponse? response) {
response?.stream?.listen((data) {
print('Data: ${data.data}');
});
},
onError: (error) {
print('Error: $error');
},
);
}
Abort a Connection — Cancel an in-flight request at any time
import 'dart:async';
import 'package:eventflux/eventflux.dart';
void main() {
final completer = Completer<void>();
EventFlux.instance.connect(
EventFluxConnectionType.get,
'https://example.com/events',
abortTrigger: completer.future,
onSuccessCallback: (EventFluxResponse? response) {
response?.stream?.listen((data) {
print('Data: ${data.data}');
});
},
onError: (error) {
print('Error: $error');
},
);
// Cancel the connection at any time
Future.delayed(Duration(seconds: 10), () {
completer.complete();
});
}
Web Platform — Configure CORS and credentials for browser SSE
import 'package:eventflux/eventflux.dart';
import 'package:flutter/foundation.dart' show kIsWeb;
void main() {
EventFlux.instance.connect(
EventFluxConnectionType.get,
'https://example.com/events',
webConfig: kIsWeb
? WebConfig(
mode: WebConfigRequestMode.cors,
credentials: WebConfigRequestCredentials.omit,
)
: null,
onSuccessCallback: (EventFluxResponse? response) {
response?.stream?.listen((data) {
print('Data: ${data.data}');
});
},
onError: (error) {
print('Error: $error');
},
);
}
Multiple Connections — Run independent SSE streams in parallel
import 'package:eventflux/eventflux.dart';
void main() {
EventFlux e1 = EventFlux.spawn();
EventFlux e2 = EventFlux.spawn();
e1.connect(
EventFluxConnectionType.get,
'https://example.com/stream-1',
tag: 'Stream 1',
onSuccessCallback: (EventFluxResponse? response) {
response?.stream?.listen((data) {
print('Stream 1: ${data.data}');
});
},
onError: (error) {
print('Stream 1 error: $error');
},
);
e2.connect(
EventFluxConnectionType.get,
'https://example.com/stream-2',
tag: 'Stream 2',
onSuccessCallback: (EventFluxResponse? response) {
response?.stream?.listen((data) {
print('Stream 2: ${data.data}');
});
},
onError: (error) {
print('Stream 2 error: $error');
},
);
// Disconnect both when done
// await e1.disconnect();
// await e2.disconnect();
}
Event Filtering — filter events by type using where():
response?.where('message').listen((data) {
print('Message event: ${data.data}');
});
API Reference 📚
Connect
Connects to a server-sent event stream.
| Parameter | Type | Description | Default |
|---|---|---|---|
type |
EventFluxConnectionType |
HTTP method (get or post) |
— |
url |
String |
SSE stream URL | — |
onSuccessCallback |
Function(EventFluxResponse?) |
Callback on successful connection (required) | — |
header |
Map<String, String> |
HTTP headers | {'Accept': 'text/event-stream', 'Cache-Control': 'no-store'} |
onConnectionClose |
Function()? |
Called when the connection closes | — |
autoReconnect |
bool |
Auto-reconnect on disconnection | false |
reconnectConfig |
ReconnectConfig? |
Reconnection settings (required if autoReconnect is true) |
— |
onError |
Function(EventFluxException)? |
Error callback | — |
body |
Map<String, dynamic>? |
Request body for POST | — |
files |
List<MultipartFile>? |
Files for multipart requests | — |
multipartRequest |
bool |
Send as multipart | false |
tag |
String? |
Debug tag (appears in logs) | — |
logReceivedData |
bool |
Log received SSE data | false |
httpClient |
HttpClientAdapter? |
Custom HTTP client | — |
webConfig |
WebConfig? |
Web platform configuration (required on web) | — |
interceptors |
List<EventFluxInterceptor>? |
Request/response/error interceptors | — |
abortTrigger |
Future<void>? |
Future that aborts the connection when completed | — |
ReconnectConfig
| Parameter | Type | Description | Default |
|---|---|---|---|
mode |
ReconnectMode |
linear or exponential (required) |
— |
interval |
Duration |
Base retry interval | Duration(seconds: 2) |
maxAttempts |
int |
Max reconnect attempts (-1 for unlimited) | 5 |
maxBackoff |
Duration |
Cap for exponential backoff | Duration(seconds: 30) |
connectionTimeout |
Duration? |
Idle timeout — drops connection if no data received | — |
onReconnect |
void Function(int attempt, Duration delay)? |
Called on each reconnect attempt | — |
reconnectHeader |
Future<Map<String, String>> Function()? |
Async header refresh for reconnect | — |
EventFluxInterceptor
Subclass EventFluxInterceptor and override any of the following methods:
| Method | Signature | Description |
|---|---|---|
onRequest |
Future<BaseRequest> onRequest(BaseRequest request) |
Modify the request before sending (e.g., inject auth headers). Throw EventFluxException to abort. |
onResponse |
Future<StreamedResponse> onResponse(StreamedResponse response) |
Inspect the response after receiving. Must not consume the stream body. |
onError |
Future<EventFluxException?> onError(EventFluxException exception) |
Handle or suppress errors. Return null to suppress the exception. |
EventFluxStatus
| Value | Description |
|---|---|
connectionInitiated |
Connection process has started |
connected |
Successfully connected to the event stream |
reconnecting |
Auto-reconnect is in progress |
disconnected |
Connection has been closed |
error |
An error occurred during connection or disconnection |
Disconnect
EventFluxStatus status = await EventFlux.instance.disconnect();
Returns a Future<EventFluxStatus> indicating the disconnection status.
Spawn
EventFlux instance = EventFlux.spawn();
Returns a new independent EventFlux instance for managing parallel SSE connections.
Contributors 💜
EventFlux wouldn't exist without these people who believed it could be better.
Contributing 🤝
Contributions are welcome — open an issue or submit a pull request. Every bit helps.
License
Licensed under MIT.
Libraries
- client
- Imports
- enum
- eventflux
- extensions/fetch_client_extension
- http_client_adapter
- models/base
- models/data
- models/exception
- models/interceptor
- models/reconnect
- models/response
- models/web_config/redirect_policy
- models/web_config/request_cache
- models/web_config/request_credentials
- models/web_config/request_mode
- models/web_config/request_referrer_policy
- models/web_config/web_config
- utils







