FlowDux
A predictable state management library for Dart with execution strategies.
Features
- Redux-style unidirectional data flow
- Middleware support for side effects
- Execution strategies (takeLatest, takeLeading, sequential, debounce, throttle, retry)
- Strategy chaining and groups
- Error handling with ErrorProcessor
- FlowHolderAction for wrapping existing Streams
distinct()for filtering consecutive identical states
Installation
dependencies:
flowdux: ^0.2.4
Quick Start
Define State and Actions
// State
class CounterState {
final int count;
CounterState(this.count);
CounterState copyWith({int? count}) => CounterState(count ?? this.count);
}
// Actions
class IncrementAction implements Action {}
class DecrementAction implements Action {}
class AddAction implements Action {
final int value;
AddAction(this.value);
}
Create a Reducer
final counterReducer = ReducerBuilder<CounterState, Action>()
..on<IncrementAction>((state, _) => state.copyWith(count: state.count + 1))
..on<DecrementAction>((state, _) => state.copyWith(count: state.count - 1))
..on<AddAction>((state, action) => state.copyWith(count: state.count + action.value));
Create and Use the Store
final store = createStore<CounterState, Action>(
initialState: CounterState(0),
reducer: counterReducer.build(),
);
// Dispatch actions
store.dispatch(IncrementAction());
// Listen to state changes
store.state.listen((state) => print('Count: ${state.count}'));
Middleware with Execution Strategies
Middleware allows you to handle side effects like API calls. Use execution strategies to control how concurrent actions are processed.
class SearchMiddleware extends Middleware<AppState, Action> {
SearchMiddleware() {
// takeLatest cancels previous search when new one arrives
apply(takeLatest()).on<SearchAction>((state, action) async* {
final results = await api.search(action.query);
yield SearchResultsAction(results);
});
}
}
final store = createStore<AppState, Action>(
initialState: AppState(),
reducer: appReducer,
middlewares: [SearchMiddleware()],
);
Execution Strategies
FlowDux provides execution strategies to control how concurrent actions are processed in middleware.
| Category | Strategies | Purpose |
|---|---|---|
| Concurrency | takeLatest(), takeLeading(), sequential() |
How to handle concurrent executions |
| Timing | debounce(duration), throttle(duration) |
When to execute |
| Resilience | retry(n), retryWithBackoff(...) |
How to handle failures |
Concurrency Strategies
takeLatest()
Cancels previous processing when a new action arrives. Only the latest action's result is emitted.
apply(takeLatest()).on<SearchAction>((state, action) async* {
final results = await api.search(action.query);
yield SearchResultsAction(results);
});
Use cases: Search, API refresh, pagination with pull-to-refresh
takeLeading()
Ignores new actions while one is still processing. Only the first action in a series executes.
apply(takeLeading()).on<SubmitAction>((state, action) async* {
// Prevents duplicate submissions
final result = await api.submit(action.data);
yield SubmitSuccessAction(result);
});
Use cases: Form submission, payment processing, preventing double-clicks
sequential()
Queues actions and processes them one at a time, preserving order.
apply(sequential()).on<SaveAction>((state, action) async* {
// All save requests are processed in order
await api.save(action.data);
yield SaveCompleteAction(action.id);
});
Use cases: Sequential API calls, ordered form saves, FIFO task processing
Timing Strategies
debounce(duration)
Delays execution. If another action arrives before the delay completes, the timer restarts.
apply(debounce(Duration(milliseconds: 500))).on<TextChangedAction>((state, action) async* {
// Only saves after user stops typing for 500ms
await api.save(action.text);
yield SaveCompleteAction();
});
Use cases: Search autocomplete, autosave, input validation
throttle(duration)
Limits execution rate. Executes the first action immediately, then ignores subsequent actions until the time window passes.
apply(throttle(Duration(seconds: 1))).on<ScrollAction>((state, action) async* {
// Logs scroll position at most once per second
analytics.logScroll(action.position);
yield action;
});
Use cases: Analytics events, scroll handling, rate limiting
Resilience Strategies
retry(maxAttempts)
Retries the processor execution on failure.
apply(retry(3)).on<FetchDataAction>((state, action) async* {
// Retries up to 3 times on failure
final data = await api.fetchData(action.id);
yield FetchSuccessAction(data);
});
// With custom retry condition
apply(retry(3, shouldRetry: (e) => e is SocketException)).on<FetchDataAction>((state, action) async* {
// Only retries on SocketException
});
retryWithBackoff(...)
Retries with exponential backoff delay between attempts.
apply(retryWithBackoff(
maxAttempts: 5,
initialDelay: Duration(milliseconds: 100),
maxDelay: Duration(seconds: 10),
factor: 2.0, // Exponential multiplier
jitter: 0.1, // Random jitter to prevent thundering herd
)).on<FetchDataAction>((state, action) async* {
final data = await api.fetchData(action.id);
yield FetchSuccessAction(data);
});
Use cases: Network error recovery, transient server errors, rate limiting
Strategy Chaining
Combine strategies from different categories using the then operator:
// Debounce input, then cancel previous search, then retry on failure
apply(debounce(Duration(milliseconds: 300)).then(takeLatest()).then(retry(3)))
.on<SearchAction>((state, action) async* {
final results = await api.search(action.query);
yield SearchResultsAction(results);
});
Rules:
- Strategies from different categories can be chained
- Strategies from the same category cannot be chained (throws exception)
FlowHolderAction
Use FlowHolderAction to wrap existing Streams (Repository, Socket) and convert them to Actions. No side effects in the Action—just holds and transforms the Stream:
// FlowHolderAction wraps an existing Stream and converts to Stream<Action>
class ObserveUserAction with FlowHolderAction {
final Stream<User> userStream;
ObserveUserAction(this.userStream);
@override
Stream<Action> toStreamAction() {
return userStream.map((user) => SetUserAction(user));
}
// Default: TakeLatest strategy (auto-cancels previous)
// Override for concurrent execution:
// @override
// ExecutionStrategy get strategy => concurrent();
}
// Usage: pass the Stream from Repository/Socket
final repositoryStream = userRepository.getUser(123); // Stream creation (cold)
store.dispatch(ObserveUserAction(repositoryStream)); // Store collects it
Error Handling
Use ErrorProcessor to catch errors in middleware and convert them to Actions:
class AppErrorProcessor implements ErrorProcessor<Action> {
@override
Stream<Action> process(Object error, StackTrace stackTrace) async* {
if (error is NetworkException) {
yield NetworkErrorAction(error.message);
} else {
yield UnknownErrorAction(error.toString());
}
}
}
final store = createStore<AppState, Action>(
initialState: AppState(),
reducer: appReducer,
middlewares: [AppMiddleware()],
errorProcessor: AppErrorProcessor(),
);
Filtering Consecutive Identical States
Use distinct() to filter out consecutive identical states:
// Only emits when state actually changes
store.state.distinct().listen((state) {
print('State changed: $state');
});
This is useful when:
- Multiple actions produce the same state
- You want to avoid unnecessary UI rebuilds
- You need to deduplicate state emissions
Architecture
dispatch(action) → Middleware Chain → Reducer → StateFlow
↓
[Strategy Processing]
[Error Handling]
[FlowHolderAction]
| Component | Role |
|---|---|
| Middleware | Side effects (API calls, logging), action transformation |
| ExecutionStrategy | Control concurrent action processing |
| FlowHolderAction | Convert existing Stream to Action stream |
| ErrorProcessor | Catch errors and convert to Actions |
| Reducer | Pure function: (State, Action) → NewState |
Flutter Integration
For Flutter apps, use the flowdux_flutter package which provides widgets like StoreProvider, StoreBuilder, StoreSelector, StoreConsumer, and StoreListener.
dependencies:
flowdux: ^0.2.4
flowdux_flutter: ^0.2.3
Documentation
For full documentation and examples, see the FlowDux repository.
License
Apache License 2.0
Libraries
- flowdux
- A predictable state management library with execution strategies.