stream_subscriber
At its core stream_subscriber is a mixin that adds methods for easily managing stream subscriptions (listeners & notifiers) to any class it's mixed into.
The library also includes several observable classes that utilize StreamSubscriber: StreamValue, StreamList, StreamMap, and StreamSet. StreamValue contains a single observable value, while StreamList, StreamMap, and StreamSet implement Dart's List, Map, and Set classes respectively.
Usage
Classes that mixin StreamSubscriber contain a private broadcast (multi-stream) StreamController that's created on construction. To get started, just add some listeners, notify them of any events, and dispose of the class when it's no longer needed.
Implementation
The StreamSubscriber mixin exposes 4 methods for managing StreamSubscriptions
of a specified subtype: addListener
, removeListener
, notifyListeners
, and
dispose
, as well as the hasListener
and numberOfListeners
getters.
/// A class that contains an [int] and notifies its listeners
/// of the new value each time it's modified.
class ObservableClass with StreamSubscriber<int> {
ObservableClass(int value) : _value = value;
int _value;
int get value => _value;
set value(int value) {
_value = value;
notifyListeners(_value);
}
}
Note: The notifyListeners
method can be called anywhere and doesn't
necessarily have to provide a specific value to the listeners. It's accepted
value does however have to be of the same type as StreamSubscriber's subtype,
if no subtype is provided, then any object can be provided, regardless of type.
Subscribing Listeners
ObservableClass
's value
can now be listened to for any changes.
/// Instance of a new [ObservableClass].
final observable = ObservableClass(0);
// Add a listener
observable.addListener((value) {
print('Value was changed to $value.');
});
observable.value = 3;
// The listener will asynchronously print: Value was changed to 3.
observable.notifyListeners(5);
// The listener will asynchronously print: Value was changed to 5.
// However, because [notifyListeners] was called directly, the value
// wasn't actually updated to 5.
print(observable); // 3
// Remove the listener
observable.removeListener();
observable.value = 5;
// Because the listener was removed, nothing will be printed this time.
// [dispose] cancels and removes any active listeners and closes the notifier.
observable.dispose();
// After calling [dispose], [observable] will no longer be able to create
// or notify any listeners and its used resources will be freed up.
Observable Values
The StreamValue class is implemented in the same way ObservableClass
is
in the example above, but can be provided a subtype which is passed on to
StreamSubscriber.
final observable = StreamValue<int>(0);
observable.addListener((value) {
print(value);
});
observable.value++;
// The listener will asynchronously print: 1
observable.value = 5;
// The listener will asynchronously print: 5
observable.value--;
// The listener will asynchronously print: 4
observable.dispose();
StreamValue also has an additional optional parameter, onUpdate
, which is
called every time the value
is changed, before the listeners are notified.
Setting the Value Without Notifying the Listeners
To set the value without notifying the listeners, the setValue
method
can be used.
// [value] will be set to `0`, but listener won't be notified.
observable.setValue(0);
// [value] will be set to `0` and the listener will be notified.
observable.value = 0;
The onUpdate
Notifier
onUpdate
is a synchronously executed event, as such it can be used to further
modify the value before it is sent to the listeners. However, updating the value
directly from within onUpdate
will cause onUpdate
to be called in an infinite
loop, triggering a stack overflow, instead the setValue
method must be used to
update the value
without notifying the listeners or triggering onUpdate
.
Note: In the case of the StreamCollections described below, any of their
methods with a notifyListeners
parameter can be set to false
, and can be used
to update the collection without notifying their listeners.
var observable = StreamValue<int>(0);
observable.onUpdate = (value) {
// The [setValue] method should be used to update the value without notifying the
// listeners. Setting the value directly here would cause a stack overflow.
observable.setValue(value + 1);
print(observable.value);
};
observable.value++;
// The listener will synchronously print: 2
observable.value = 5;
// The listener will synchronously print: 6
observable.value -= 2;
// The listener will synchronously print: 5
observable.dispose();
If you don't need to reference observable
within onUpdate
, you can also
provide onUpdate
as an optional parameter when constructing the StreamValue.
var observable = StreamValue<int>(0, (value) {
print(value);
});
observable.value++;
// The listener will synchronously print: 1
observable.value = 5;
// The listener will synchronously print: 5
observable.value -= 2;
// The listener will synchronously print: 3
Observable Collections
StreamList, StreamMap, and StreamSet are implementations of the List, Map, and Set classes respectively. They extends a shared base class, StreamCollection which extends the StreamValue class.
Each of these classes have 3 types of listeners, update listeners, event listeners, and change listeners.
Update Listeners
The update listener, which is inherited from StreamValue, returns the entire collection anytime the collection is modified or overwritten.
final list = StreamList<int>.of([0, 1, 2, 3, 4, 5]);
list.addListener((list) {
print(list);
});
list.add(6);
// The listener will asynchronously print: [0, 1, 2, 3, 4, 5, 6]
list.removeAll();
// The listener will asynchronously print: []
list.addAll(<int>[2, 4, 6, 8]);
// The listener will asynchronously print: [2, 4, 6, 8]
list.dispose();
Event Listeners
Event listeners, which are inherited from StreamCollection's parent class, StreamEventProvider, returns an event containing the type of modification made, and a map of the keys/indexes and the values of the element(s) modified.
final list = StreamList<int>.of([0, 1, 2, 3, 4, 5]);
list.addEventListener(event) {
print('${event.type} ${event.elements}');
});
list.add(10);
// The listener will asynchronously print: CollectionEventType.addition {4: 10}
list.addAll(<int>[13, 16, 17]);
// The listener will asynchronously print: CollectionEventType.addition {5: 13, 6: 16, 7: 17}
list.removeWhere((value) => value.isOdd);
// The listener will asynchronously print: CollectionEventType.removal {5: 13, 7: 17}
list[0] = 5;
// The listener will asynchronously print: CollectionEventType.update {0: 5}
Note: event.values
could be called instead of event.elements
to get just
the values of the affected elements without the keys/indexes.
Note: In order to notify the listeners of the changes made to a collection,
the top-level methods within StreamList, StreamMap, and StreamSet must be
used. Any changes made to the underlying collection, referenced by the value
getter, will not notify the listeners.
// The value of the element at index 0 will be set to 5 and the listeners
// will be notified.
list[0] = 5;
// The value of the element at index 0 will be set to 2, but the listeners
// won't be notified.
list.value[0] = 2;
Change Listeners
Change listeners, which are inherited from StreamCollection, are notified
individually for every element changed within the underlying collection. They
receive an event containing the type of modification made, the key
/index and
the value
of the affected element.
final list = StreamList<int>[2, 4, 6, 8];
list.addChangeListener((event) {
print('${event.type} [${event.key}, ${event.value}]');
});
list.add(10);
// The listener will asynchronously print: CollectionEventType.addition [4: 10]
list.addAll(<int>[13, 16, 17]);
// The listener will asynchronously print: CollectionEventType.addition [5: 13]
// The listener will asynchronously print: CollectionEventType.addition [6: 16]
// The listener will asynchronously print: CollectionEventType.addition [7: 17]
list.removeWhere((value) => value.isOdd);
// The listener will asynchronously print: CollectionEventType.removal [5: 13]
// The listener will asynchronously print: CollectionEventType.removal [7: 17]
list[0] = 5;
// The listener will asynchronously print: CollectionEventType.update [0: 5]
Note: As all notifications are triggered synchronously but broadcast asynchronously, update and event notifications will be broadcast after the first change notification is broadcast, even if more change notifications are queued. As such, it's better to handle individual changes from the event notification, rather than using both an event listener and a change listener.
The onEvent
and onChange
Notifiers
The onEvent
and onChange
notifiers, like the onUpdate
notifier, are
synchronously executed events triggered just before the event and change
listeners are notified.
final list = StreamList<int>.of([0, 1, 2, 3, 4, 5]);
list.onEvent = (event) {
print('${event.type} ${event.values}');
};
list.addAll([6, 7, 8, 9]);
// [onEvent] will synchronously print: CollectionEventType.addition [6, 7, 8, 9]
list.onChange = (event) {
print('${event.type} [${event.value}]');
};
list.removeAll([6, 7, 8, 9]);
// [onChange] will synchronously print: CollectionEventType.removal [6]
// [onChange] will synchronously print: CollectionEventType.removal [7]
// [onChange] will synchronously print: CollectionEventType.removal [8]
// [onChange] will synchronously print: CollectionEventType.removal [9]
// [onEvent] will synchronously print: CollectionEventType.removal [6, 7, 8, 9]
Note: As described in onUpdate
's section above, modifying the collection
from within onEvent
or onChange
will cause them be called in an infinite
loop, triggering a stack overflow. To prevent this, the collections must be
updated using any of the collection's methods that have a notifyListeners
parameter, and setting it to false
.
Libraries
- stream_subscriber
- Observable objects for asynchronously listening for changes to values and collections, with a mixin to easily manage streams on any class.