funx 1.2.1
funx: ^1.2.1 copied to clipboard
Composable function decorators for advanced execution control and reliability.

Funx #
Function execution control library for Dart and Flutter. Provides decorators for managing timing, concurrency, reliability, and performance of asynchronous and synchronous functions.
Purpose #
Funx addresses the complexity of implementing reliable, performant function execution patterns in Dart/Flutter applications. Instead of manually implementing retry logic, debouncing, rate limiting, or circuit breakers, developers wrap functions with composable decorators.
This package is useful when building applications that require:
- Controlled API request execution with retry and circuit breaker patterns
- User input handling with debouncing and throttling
- Concurrent operation management with locks and semaphores
- Fault-tolerant network operations with fallback strategies
- Performance optimization through caching, batching, and memoization
- Observable function execution with metrics and audit trails
Features #
Core Functionality #
- Function wrapper with composable decorators
- Support for async (
Future<T>), sync (T), and parameterized functions - Three wrapper types:
Func<R>,Func1<T, R>,Func2<T1, T2, R> - Zero external dependencies
Mechanism Categories #
- Timing (6): debounce, throttle, delay, timeout, defer, idle callback
- Scheduling (2): schedule (one-time, recurring, custom), backpressure control with 6 strategies
- Concurrency (8): lock, read-write lock, semaphore, queue, bulkhead, barrier, countdown latch, monitor
- Reliability (5): retry, backoff strategies, circuit breaker, fallback, recovery
- Performance (11): rate limiting, batching, memoization, cache-aside, compression, deduplication, sharing, once, warm-up, lazy loading, priority queue
- Error Handling (2): catch, default value
- Validation (2): guard, validate
- Transformation (3): proxy, transform, merge
- Control Flow (3): switch, conditional, repeat
- Orchestration (3): race, all, saga
- Observability (3): tap, monitor, audit
- State (1): snapshot
Total: 48 mechanisms across 12 categories
Basic Usage #
Debounce - Search Autocomplete #
import 'package:funx/funx.dart';
var callCount = 0;
final search = Func1<String, String>((query) async {
callCount++;
return 'Results for: $query';
}).debounce(Duration(milliseconds: 50));
search('a');
search('ab');
search('abc');
await Future.delayed(Duration(milliseconds: 100));
// callCount == 1 (only last call executed)
Throttle - Button Clicks #
var callCount = 0;
final trackScroll = Func1<double, void>((position) async {
callCount++;
}).throttle(Duration(milliseconds: 50));
await trackScroll(100);
expect(() => trackScroll(200), throwsStateError); // Throttled
expect(() => trackScroll(300), throwsStateError); // Throttled
// callCount == 1 (first call executed, others rejected)
Retry - Network Requests #
var attempts = 0;
final fetchData = Func<String>(() async {
attempts++;
if (attempts < 3) throw Exception('Network error');
return 'Success';
}).retry(maxAttempts: 3);
final result = await fetchData();
// result == 'Success', attempts == 3
Circuit Breaker - Failing Services #
final breaker = CircuitBreaker(
failureThreshold: 3,
timeout: Duration(seconds: 1),
);
var callCount = 0;
final riskyOperation = Func<String>(() async {
callCount++;
throw Exception('Service unavailable');
}).circuitBreaker(breaker);
// After 3 failures, circuit opens
for (var i = 0; i < 3; i++) {
try { await riskyOperation(); } catch (_) {}
}
// breaker.state == CircuitBreakerState.open
// Next calls fail immediately without executing function
Memoize - Caching Results #
var callCount = 0;
final square = Func1<int, int>((n) async {
callCount++;
return n * n;
}).memoize();
final result1 = await square(10);
final result2 = await square(10);
// callCount == 1 (second call uses cached result)
Core Concepts #
Func Wrapper Types #
Funx provides three wrapper types based on the number of parameters:
// No parameters
final greet = Func<String>(() async => 'Hello, World!');
final result = await greet();
// One parameter
final processAge = Func1<int, String>((age) async => 'Age: $age');
final output = await processAge(25);
// Two parameters
final calculate = Func2<int, int, int>((x, y) async => x + y);
final sum = await calculate(1, 2);
Chaining Decorators #
Decorators can be chained to combine multiple behaviors:
var callCount = 0;
final processPayment = Func1<double, String>((amount) async {
callCount++;
if (amount <= 0) throw ArgumentError('Invalid amount');
return 'Processed: \$$amount';
})
.retry(maxAttempts: 3)
.debounce(Duration(milliseconds: 50))
.memoize();
processPayment(100);
processPayment(100);
await Future.delayed(Duration(milliseconds: 100));
// callCount == 1 (debounced and memoized)
Execution Order #
Decorators execute in reverse order (last applied executes first):
final fn = Func<String>(() async => await operation())
.retry() // 3. Executes third
.timeout() // 2. Executes second
.tap(onValue: (v) => print(v)); // 1. Executes first
Mechanism Categories #
Timing #
Control when functions execute:
// Debounce - delay until calls stop
var executionCount = 0;
final search = Func1<String, String>((query) async {
executionCount++;
return 'Results for: $query';
}).debounce(Duration(milliseconds: 50));
search('a');
search('ab');
search('abc');
await Future.delayed(Duration(milliseconds: 100));
// executionCount == 1 (only last call executed)
// Throttle - limit execution frequency (trailing mode)
var execCount = 0;
final trackScroll = Func1<double, void>((position) async {
execCount++;
}).throttle(
Duration(milliseconds: 100),
mode: ThrottleMode.trailing,
);
for (var i = 0; i < 10; i++) {
trackScroll(i * 100.0);
await Future.delayed(Duration(milliseconds: 20));
}
// execCount < 5 (throttled to reduce frequency)
// Timeout - cancel after duration
final slowOperation = Func<String>(() async {
await Future.delayed(Duration(milliseconds: 200));
return 'Done';
}).timeout(Duration(milliseconds: 50));
// Throws TimeoutException
Scheduling #
Execute functions at specific times or recurring intervals:
// One-time execution at specific time
var executed = false;
final backup = Func(() async {
executed = true;
return 'Backup completed';
}).schedule(
at: DateTime.now().add(Duration(milliseconds: 100)),
);
final subscription = backup.start();
await Future.delayed(Duration(milliseconds: 150));
// executed == true
// Recurring execution every interval
var executionCount = 0;
final healthCheck = Func(() async {
executionCount++;
return 'OK';
}).scheduleRecurring(
interval: Duration(milliseconds: 50),
maxIterations: 3,
);
final subscription2 = healthCheck.start();
await Future.delayed(Duration(milliseconds: 200));
// executionCount == 3
subscription2.cancel();
// Custom scheduling logic
var customCount = 0;
final adaptive = Func(() async {
customCount++;
return customCount;
}).scheduleCustom(
scheduler: (lastExecution) {
// Double delay after each execution
final delay = Duration(milliseconds: 50 * customCount);
return DateTime.now().add(delay);
},
maxIterations: 2,
);
final subscription3 = adaptive.start();
await Future.delayed(Duration(milliseconds: 200));
// customCount == 2
Backpressure #
Control execution rate when consumer is slower than producer:
// Drop strategy - reject new requests when busy
var processedCount = 0;
final processor = Func1<int, void>((value) async {
processedCount++;
await Future.delayed(Duration(milliseconds: 50));
}).backpressure(
strategy: BackpressureStrategy.drop,
maxConcurrent: 1,
);
processor(1); // Accepted
processor(2); // Dropped (busy)
processor(3); // Dropped (busy)
await Future.delayed(Duration(milliseconds: 100));
// processedCount == 1
// Buffer strategy - queue up to buffer size
var bufferProcessed = 0;
final buffered = Func1<int, void>((value) async {
bufferProcessed++;
await Future.delayed(Duration(milliseconds: 20));
}).backpressure(
strategy: BackpressureStrategy.buffer,
maxConcurrent: 1,
bufferSize: 3,
);
buffered(1); // Processing
buffered(2); // Buffered
buffered(3); // Buffered
buffered(4); // Buffered
await Future.delayed(Duration(milliseconds: 100));
// bufferProcessed == 4 (all processed from buffer)
// Sample strategy - probabilistic acceptance
var sampledCount = 0;
final sampled = Func1<int, void>((value) async {
sampledCount++;
}).backpressure(
strategy: BackpressureStrategy.sample,
sampleRate: 0.5, // Accept ~50% of requests
maxConcurrent: 10,
);
for (var i = 0; i < 100; i++) {
sampled(i);
}
await Future.delayed(Duration(milliseconds: 100));
// sampledCount ≈ 50 (probabilistic)
Concurrency #
Manage parallel execution:
// Lock - mutual exclusion (ensures sequential execution)
var counter = 0;
final incrementCounter = Func<void>(() async {
await Future.delayed(Duration(milliseconds: 10));
counter++;
}).lock();
await Future.wait([
incrementCounter(),
incrementCounter(),
incrementCounter(),
]);
// counter == 3 (all executed sequentially)
// Semaphore - limit concurrent executions
var concurrentCount = 0;
var maxConcurrent = 0;
final task = Func<void>(() async {
concurrentCount++;
maxConcurrent = max(concurrentCount, maxConcurrent);
await Future.delayed(Duration(milliseconds: 50));
concurrentCount--;
}).semaphore(maxConcurrent: 2);
await Future.wait([
task(), task(), task(), task(),
]);
// maxConcurrent == 2 (never more than 2 concurrent)
Reliability #
Build resilient operations:
// Retry with exponential backoff
var attemptCount = 0;
final unreliableOp = Func<String>(() async {
attemptCount++;
if (attemptCount < 3) throw Exception('Fail');
return 'success';
}).retry(
maxAttempts: 5,
backoff: ExponentialBackoff(
initialDelay: Duration(milliseconds: 10),
maxDelay: Duration(milliseconds: 100),
),
);
final result = await unreliableOp(); // 'success' after 3 attempts
// Circuit Breaker - prevent cascading failures
final apiCall = Func<String>(() async {
throw Exception('Service down');
}).circuitBreaker(
failureThreshold: 3,
successThreshold: 1,
timeout: Duration(milliseconds: 100),
);
// First 3 calls fail and open the circuit
for (var i = 0; i < 3; i++) {
try { await apiCall(); } catch (_) {}
}
// Circuit is now OPEN - fails fast
try {
await apiCall();
} catch (e) {
// Throws immediately without calling function
}
// Fallback - provide alternative value
final fetchConfig = Func<Map<String, dynamic>>(() async {
throw Exception('Config service unavailable');
}).fallback(
fallbackValue: {'mode': 'default'},
);
final config = await fetchConfig(); // {'mode': 'default'}
Performance #
Optimize execution:
// Memoize - cache results
var callCount = 0;
final expensiveOp = Func1<int, int>((n) async {
callCount++;
await Future.delayed(Duration(milliseconds: 10));
return n * 2;
}).memoize();
await expensiveOp(5); // callCount: 1
await expensiveOp(5); // callCount: 1 (cached)
await expensiveOp(10); // callCount: 2 (different arg)
// Batch - group operations
final results = <int>[];
final batchOp = Func1<int, void>((value) async {
await Future.delayed(Duration(milliseconds: 5));
results.add(value);
}).batch(
executor: (values) async {
await Future.delayed(Duration(milliseconds: 10));
results.addAll(values);
},
);
batchOp(1);
batchOp(2);
await Future.delayed(Duration(milliseconds: 50));
// results contains [1, 2] from batched execution
// Rate limit - control throughput
var executionCount = 0;
final rateLimitedOp = Func<void>(() async {
executionCount++;
}).rateLimit(
maxCalls: 2,
window: Duration(milliseconds: 100),
);
rateLimitedOp();
rateLimitedOp();
rateLimitedOp(); // This one waits or throws depending on strategy
// Deduplicate - prevent duplicate sequential calls
var duplicateCallCount = 0;
final deduplicatedOp = Func1<String, String>((input) async {
duplicateCallCount++;
return input.toUpperCase();
}).deduplicate(window: Duration(milliseconds: 100));
deduplicatedOp('test');
await Future.delayed(Duration(milliseconds: 10));
deduplicatedOp('test'); // Ignored (duplicate within window)
await Future.delayed(Duration(milliseconds: 50));
// duplicateCallCount == 1
// Share - share single execution among concurrent callers
var sharedCallCount = 0;
final sharedOp = Func<String>(() async {
sharedCallCount++;
await Future.delayed(Duration(milliseconds: 50));
return 'result';
}).share();
await Future.wait([
sharedOp(),
sharedOp(),
sharedOp(),
]);
// sharedCallCount == 1 (all three calls shared one execution)
// Priority Queue - execute tasks in priority order
var executionOrder = <String>[];
final processTask = Func1<String, String>((task) async {
executionOrder.add(task);
await Future.delayed(Duration(milliseconds: 10));
return 'Completed: $task';
}).priorityQueue(
priorityFn: (task) => task == 'critical' ? 10 : 1,
maxQueueSize: 100,
maxConcurrent: 1,
);
// Submit tasks with different priorities
processTask('normal-1');
processTask('critical');
processTask('normal-2');
await Future.delayed(Duration(milliseconds: 100));
// executionOrder == ['normal-1', 'critical', 'normal-2']
// 'critical' executed before 'normal-2' despite being submitted later
Priority Queue #
Manage task execution with priority-based ordering:
// Basic priority queue - higher priority executes first
var executionOrder = <int>[];
final processor = Func1<int, void>((taskId) async {
executionOrder.add(taskId);
await Future.delayed(Duration(milliseconds: 20));
}).priorityQueue(
priorityFn: (id) => id, // Use task ID as priority
maxQueueSize: 100,
maxConcurrent: 1,
);
// Submit tasks (lower ID = lower priority)
processor(1);
processor(5);
processor(3);
await Future.delayed(Duration(milliseconds: 100));
// executionOrder: [1, 5, 3] - highest priority (5) executed second
// Queue overflow policies
final dropLowest = Func1<int, String>((priority) async {
return 'Task: $priority';
}).priorityQueue(
priorityFn: (p) => p,
maxQueueSize: 2,
maxConcurrent: 1,
onQueueFull: QueueFullPolicy.dropLowestPriority,
onItemDropped: (item) => print('Dropped: $item'),
);
// Starvation prevention - boost waiting tasks
final withStarvation = Func1<String, String>((task) async {
await Future.delayed(Duration(milliseconds: 50));
return 'Done: $task';
}).priorityQueue(
priorityFn: (t) => t == 'urgent' ? 10 : 1,
starvationPrevention: true, // Auto-boost long-waiting items
onStarvationPrevention: (task) => print('Boosted: $task'),
);
// Monitor queue state
final monitored = Func1<int, int>((x) async => x * 2).priorityQueue(
priorityFn: (x) => x,
) as PriorityQueueExtension<int, int>;
print('Queue length: ${monitored.queueLength}');
print('Active tasks: ${monitored.activeCount}');
Error Handling #
Transform and handle errors:
// Catch specific exceptions
final riskyOp = Func<String>(() async {
throw ArgumentError('Invalid input');
}).catchError(
handlers: {
ArgumentError: (e, stack) => 'handled: ${e.message}',
},
);
final result = await riskyOp(); // 'handled: Invalid input'
// Catch any exception with default handler
final anyErrorOp = Func<int>(() async {
throw Exception('Something went wrong');
}).catchError(
handlers: {},
defaultHandler: (e, stack) => 42,
);
final value = await anyErrorOp(); // 42
Validation #
Validate conditions before execution:
// Guard - validate preconditions
var guardCallCount = 0;
final guardedOp = Func1<int, String>((value) async {
guardCallCount++;
return 'Processed: $value';
}).guard(
preCondition: (value) => value > 0,
);
try {
await guardedOp(-5); // Throws GuardException
} catch (e) {
// guardCallCount == 0 (function not called due to failed guard)
}
final result = await guardedOp(10); // 'Processed: 10'
// guardCallCount == 1
Observability #
Monitor and inspect execution:
// Tap - observe values without changing them
var tapValue = '';
var tapError = '';
final tappedOp = Func<String>(() async {
return 'success';
}).tap(
onValue: (result) => tapValue = result,
onError: (error, stack) => tapError = error.toString(),
);
final result = await tappedOp();
// result == 'success'
// tapValue == 'success'
// Tap with errors
final errorOp = Func<String>(() async {
throw Exception('fail');
}).tap(
onValue: (result) => tapValue = result,
onError: (error, stack) => tapError = error.toString(),
);
try {
await errorOp();
} catch (e) {
// tapError == 'Exception: fail'
}
Common Patterns #
API Client with Resilience #
final breaker = CircuitBreaker(
failureThreshold: 3,
timeout: Duration(seconds: 1),
);
var callCount = 0;
final apiCall = Func1<String, String>((endpoint) async {
callCount++;
if (callCount < 2) throw Exception('Network error');
return 'Response from $endpoint';
})
.retry(maxAttempts: 3)
.circuitBreaker(breaker)
.timeout(Duration(seconds: 5))
.memoize();
final result = await apiCall('/users');
// result: 'Response from /users'
// callCount: 2 (first failed, retry succeeded)
final result2 = await apiCall('/users');
// result2: 'Response from /users' (from cache)
// callCount: still 2
Search with Debounce and Cache #
var searchCount = 0;
final search = Func1<String, String>((query) async {
searchCount++;
return 'Results for: $query';
})
.debounce(Duration(milliseconds: 50))
.memoize();
search('test');
search('test');
search('test');
await Future.delayed(Duration(milliseconds: 100));
// searchCount: 1 (debounced to single call)
final result = await search('test');
// result: 'Results for: test'
// searchCount: still 1 (cached)
Rate-Limited Concurrent Operations #
var concurrentCount = 0;
var maxConcurrent = 0;
final processTask = Func1<int, String>((id) async {
concurrentCount++;
maxConcurrent = max(concurrentCount, maxConcurrent);
await Future.delayed(Duration(milliseconds: 50));
concurrentCount--;
return 'Task $id completed';
})
.semaphore(maxConcurrent: 2)
.rateLimit(maxCalls: 5, window: Duration(seconds: 1));
final futures = List.generate(4, processTask.call);
await Future.wait(futures);
// maxConcurrent: 2 (semaphore limited concurrency)
Resilient Data Fetcher #
final breaker = CircuitBreaker(
failureThreshold: 2,
timeout: Duration(milliseconds: 100),
);
var attempts = 0;
final fetchData = Func1<String, String>((id) async {
attempts++;
if (attempts == 1) throw Exception('Network error');
return 'Data for $id';
})
.retry(maxAttempts: 2)
.circuitBreaker(breaker)
.fallback(fallbackValue: 'Cached data')
.timeout(Duration(seconds: 5));
final result = await fetchData('123');
// result: 'Data for 123'
// attempts: 2 (retry worked)
Priority-Based Task Processing #
// Process tasks by priority with queue overflow handling
var processed = <String>[];
final taskProcessor = Func1<(String, int), String>(
(task) async {
final (name, priority) = task;
processed.add(name);
await Future.delayed(Duration(milliseconds: 30));
return 'Completed: $name';
},
).priorityQueue(
priorityFn: (task) => task.$2, // Use second element as priority
maxQueueSize: 5,
maxConcurrent: 2,
onQueueFull: QueueFullPolicy.dropLowestPriority,
starvationPrevention: true,
);
// Submit mixed priority tasks
taskProcessor(('background-1', 1));
taskProcessor(('critical', 10));
taskProcessor(('normal', 5));
taskProcessor(('background-2', 1));
await Future.delayed(Duration(milliseconds: 150));
// Critical task executed before normal priority tasks
// Starvation prevention ensures background tasks eventually execute
Custom Extensions #
Create custom decorators by extending Func:
extension RequestIdDecorator<R> on Func<R> {
Func<R> withRequestId() {
return Func<R>(() async {
final requestId = _generateId();
print('[Request $requestId] Starting');
try {
final result = await call();
print('[Request $requestId] Success');
return result;
} catch (e) {
print('[Request $requestId] Error: $e');
rethrow;
}
});
}
String _generateId() {
return DateTime.now().millisecondsSinceEpoch.toString();
}
}
// Usage
final apiCall = Func<String>(() async => await api.fetch())
.withRequestId()
.retry(maxAttempts: 3);
Pattern for Custom Decorators #
extension CustomDecorator<R> on Func<R> {
Func<R> customBehavior() {
return Func<R>(() async {
// 1. Pre-processing
print('Pre-processing...');
// 2. Execute original function
try {
final result = await call();
// 3. Post-processing
print('Post-processing: $result');
return result;
} catch (e) {
// 4. Error handling
print('Error handling: $e');
rethrow;
} finally {
// 5. Cleanup
print('Cleanup');
}
});
}
}
// Usage
final operation = Func<String>(() async => 'done')
.customBehavior();
Synchronous Decorators #
extension CustomSyncDecorator<R> on FuncSync<R> {
FuncSync<R> withLogging() {
return FuncSync<R>(() {
print('Executing function');
final result = call();
print('Result: $result');
return result;
});
}
}
Testing #
Basic Testing #
import 'package:test/test.dart';
import 'package:funx/funx.dart';
test('debounce delays execution', () async {
var count = 0;
final fn = Func<int>(
() async => ++count,
).debounce(Duration(milliseconds: 100));
unawaited(fn());
unawaited(fn());
final future = fn();
await Future.delayed(Duration(milliseconds: 150));
final result = await future;
expect(result, 1); // only last call executed
expect(count, 1);
});
Testing Retry Logic #
test('retry with backoff', () async {
var attempts = 0;
final fn = Func<String>(() async {
attempts++;
if (attempts < 3) throw Exception('Fail');
return 'Success';
}).retry(
maxAttempts: 3,
backoff: ConstantBackoff(Duration(milliseconds: 50)),
);
final result = await fn();
expect(result, 'Success');
expect(attempts, 3);
});
Testing Composition #
test('composed mechanisms', () async {
var executions = 0;
final fn = Func<String>(() async {
executions++;
return 'result';
})
.memoize()
.retry(maxAttempts: 2);
await fn();
await fn();
expect(executions, 1); // memoized, executed once
});
License #
MIT License - see LICENSE file for details.