Mpsc class
MPSC (Multiple-Producer Single-Consumer) channels - The workhorse of concurrent systems.
The most versatile channel type for fan-in patterns where multiple producers send data to a single consumer. Combines thread-safety for concurrent producers with single-consumer optimizations for maximum throughput.
Core Strengths
- Thread-safe producers: Multiple threads can send concurrently
- Single-consumer optimization: No consumer-side coordination overhead
- Rich buffering strategies: Unbounded, bounded, rendezvous, latest-only
- Advanced drop policies: Handle backpressure with oldest/newest dropping
- Producer cloning: Create multiple sender handles from one channel
- Flexible capacity: From 0 (rendezvous) to unlimited (unbounded)
When to Use MPSC
- Event aggregation: Multiple event sources ↔ single event loop
- Task queues: Multiple workers ↔ single dispatcher
- Logging systems: Multiple threads ↔ single log writer
- UI updates: Multiple components ↔ single UI thread
- Data collection: Multiple sensors ↔ single processor
- Request handling: Multiple clients ↔ single server
Performance Characteristics
- Good throughput: ~535-950ns per operation
- Latest-only mode: Extremely fast ~7ns per operation for coalescing buffers
- Memory efficient: Chunked buffers available for unbounded channels
- Scalable: Designed to handle multiple concurrent producers
- Efficient design: Optimized for producer-consumer scenarios
Example
import 'package:cross_channel/mpsc.dart';
Future<void> main() async {
// 1. Create an MPSC channel for N:1 communication
final (eventTx, eventRx) = Mpsc.unbounded<String>();
// 2. Multiple Producers (we clone the sender)
final userTx = eventTx.clone();
final networkTx = eventTx.clone();
// Send from different sources concurrently
await userTx.send('User clicked button');
await networkTx.send('Data downloaded');
// Close when done
userTx.close();
networkTx.close();
eventTx.close(); // Close base sender
// 3. Single Event Loop (Consumer)
await for (final event in eventRx.stream()) {
print('Processed Event: $event');
}
}
Constructors
- Mpsc()
Properties
- hashCode → int
-
The hash code for this object.
no setterinherited
- runtimeType → Type
-
A representation of the runtime type of the object.
no setterinherited
Methods
-
noSuchMethod(
Invocation invocation) → dynamic -
Invoked when a nonexistent method or property is accessed.
inherited
-
toString(
) → String -
A string representation of this object.
inherited
Operators
-
operator ==(
Object other) → bool -
The equality operator.
inherited
Static Methods
-
bounded<
T> (int capacity, {String? metricsId}) → MpscChannel< T> -
channel<
T> ({int? capacity, DropPolicy policy = DropPolicy.block, OnDrop< T> ? onDrop, bool chunked = true, String? metricsId}) → MpscChannel<T> -
latest<
T> ({String? metricsId}) → MpscChannel< T> -
unbounded<
T> ({bool chunked = true, String? metricsId}) → MpscChannel< T>