computed_collections 0.1.1 computed_collections: ^0.1.1 copied to clipboard
Performant and flexible functional reactive programming for collections
Performant and flexible functional reactive programming for collections.
Tip
computed_collections builds on computed
. Make sure you are familiar with computed
fundamentals.
computed_collections
allows you to declaratively define reactive collections. It,
- uses
computed
as the base for strong and consistent reactivity - uses
fast_immutable_collections
for performant and powerful immutable collections - constructs, transforms and propagates internal change streams to stay efficient
- allows local key queries on all reactive collections
- is designed to be asymptotically optimal for all operations
Table of contents #
- Table of contents
- An example
- Ingesting external change streams
- Constant reactive maps
- Mapping with inter-key dependencies
- An index of operators
- An index of attributes
- An index of factories
An example #
Imagine you want to display in your UI a list of objects. You get the objects from a database or server. However, you only want to display the objects which have recently been modified.
Assuming you have the following:
Stream<Map<ObjectId, MyObject>> objectsUnfiltered = ...;
void displayInUI(List<MyObject> objects) {
...
}
bool isRecent(MyObject object) {
...
}
Here is how you might approach this problem imperatively:
objectsUnfiltered
.listen((o) => displayInUI(o.values.where((oo) => isRecent(oo)).toList()));
That was easy enough. But do you see the problem? The listener filters all the objects for every single change. This might not be a problem if you have a small number of objects, or the reactive logic you wish to run is computationally cheap, but such an approach clearly will not scale for a large and complex application.
To alleviate the problem, you can first identify how the unfiltered set of objects was changed and run the reactive logic only on the relevant part of the map:
({
Map<ObjectId, MyObject> unfiltered,
Map<ObjectId, MyObject> filtered
})? lastObjects;
objectsUnfiltered.listen((objs) {
switch (lastObjects) {
case var last?:
final changed =
last.unfiltered.entries.where((e) => e.value != objs[e.key]);
final addRemove = groupBy(changed, (e) => isRecent(e.value));
last.filtered.addEntries(addRemove[true] ?? []);
for (var e in (addRemove[false] ?? [])) {
last.filtered.remove(e.key);
}
lastObjects = (filtered: last.filtered, unfiltered: objs);
displayInUI(last.filtered.values.toList());
case null:
final filtered =
Map.fromEntries(objs.entries.where((e) => isRecent(e.value)));
lastObjects = (filtered: filtered, unfiltered: objs);
displayInUI(filtered.values.toList());
}
});
Phew. That was a mouthful. We had to maintain a pair of maps as our state: one representing the last set of unfiltered objects and another for the filtered set. For each change in the stream, we compute an upstream delta if a previous state existed, and update the state and UI accordingly.
You can see how such an architecture can quickly become difficult to maintain. All this was just for an asymptotically optimal, reactive .where
. Imagine if we needed to use the same upstream for multiple downstream operations, like one .where
and one groupBy
. A reactive groupBy
is already significantly more complex than the case above, but what if we also wanted to "share" the delta we compute across these downstream operations for efficiency's sake?
Let's leave these questions aside and see how such a reactive collection can be implemented using computed_collections
:
final unfiltered =
ComputedMap.fromSnapshotStream($(() => objectsUnfiltered.use.lock));
final filtered = unfiltered.removeWhere((k, v) => !isRecent(v));
filtered.snapshot.listen((s) => displayInUI(s.values.toList()));
That's all. fromSnapshotStream
internally handles the manual diffing we did in the imperative example and construct a change stream. .removeWhere
subscribes to this change stream to incrementally maintain the filtered map. Finally, we use filtered.snapshot
, which returns a computed
computation representing the snapshot of the filtered map, on which we attach a listener to update the UI. In this case, we had to use .lock
on the map we got from the stream to turn it into a fast immutable map as computed_collections
operates exclusively on them, but this can reasonably be assumed not to harm asymptotical complexity.
Of course, it would be even better to ingest a change stream instead of a snapshot stream, so that we could avoid diffing consequtive snapshots in the first place. Which brings us to...
Ingesting external change streams #
computed_collection
is built on change streams. A reactive map, represented with the interface ComputedMap
, internally has a snapshot and a change stream. Operators, such as .map
, .join
and .removeWhere
use and transform these streams. If you have a change stream computation, you can use the factory function ComputedMap.fromChangeStream
:
Computed<ChangeEvent<ObjectId, MyObject>> changes = ...;
final cmap = ComputedMap.fromChangeStream(changes); // has type `ComputedMap<ObjectId, MyObject>`
The ChangeEvent
class represents a set of changes made on a map. Mind you that you might need to write some "glue" code to convert a change stream using an external format into the format used by computed_collections
. This should not be too difficult with computed
.
Stateful change streams #
You might need the conversion from the external change stream format to be stateful. You can do this using internal sub-computations captured in the change stream computation's closure. But what if the state you need is the snapshot of the collection you are building? A simple use case would be to listen on a stream and increment the value of each published key by one. You can do this using ComputedMap.fromChangeStreamWithPrev
. Analogous to the computed
's Computed.withPrev
, it passes the collection's previous snapshot to the change stream computation:
final s = StreamController<int>(sync: true);
final stream = s.stream;
final m = ComputedMap.fromChangeStreamWithPrev((prev){
final c = KeyChanges(<int, ChangeRecord<int>>{}.lock);
stream.react((idx) => c[idx] = ChangeRecordValue(prev[idx] ?? 0 + 1));
return c;
});
m.snapshot.listen(print);
s.add(0); // prints {0:1}
s.add(0); // prints {0:2}
s.add(1); // prints {0:2, 1:1}
Constant reactive maps #
You can use ComputedMap.fromIMap
to define a constant reactive map. Then it is not quite so reactive.
final m1 = ComputedMap.fromIMap({1:2, 2:3}.lock);
final m2 = m1.mapValues((k, v) => v+1); // Is {1:3, 2:4}
m2.snapshot.listen(print); // Prints {1:3, 2:4}
Mapping with inter-key dependencies #
You can use the operators with *Computed
in their name to define transformations with arbitrary reactive dependencies. In particular, the transformation of one key can depend on the result of the transformation of another key, like the following program which decleratively computes how many steps each integers in the range $[1,16]$ take to be reduced to 1 in the Collatz conjuncture:
final m1 = ComputedMap.fromIMap(
IMap.fromEntries(List.generate(200, (i) => MapEntry(i, 0))));
late final ComputedMap<int, int> m2;
m2 = m1.mapValuesComputed((k, v) => k <= 1
? $(() => 0)
: $(() => m2[((k % 2) == 0 ? k ~/ 2 : (k * 3 + 1))].use! + 1));
final m3 = m1
.removeWhere((k, v) => k == 0 || k > 16)
.mapValuesComputed((k, v) => m2[k]);
m3.snapshot.listen(print);
This might not be the most performant implementation, but note that it is asymptotically optimal in the sense that it uses memoization. It also showcases the key-local query capabilities, as computing the Collatz sequences of all the integers in the key range of m1
( $[1, 199]$ ) requires accessing integers larger than 199.
In example/collatz.dart
, you can find a more advanced implementation which is also asymptotically optimal in its memory complexity.
An index of operators #
Below is a list of reactive operators on reactive maps along with a high-level description of them.
Operator | Description |
---|---|
.add |
Reactively adds a given key-value pair to a reactive map. |
.addAll |
Reactively adds a given IMap to a reactive map. |
.addAllComputed |
Reactively adds a given reactive map to a reactive map. |
.cast |
Reactively casts the entries of a reactive map. |
.containsKey |
Returns a computation representing if a reactive map contains a given key. |
.containsValue |
Returns a computation representing if a reactive map contains a given value. |
.map |
Reactively maps each entry of a reactive map by a given synchronous function. |
.mapComputed |
Reactively maps each entry of a reactive map by a given reactive convertion computation. |
.mapValues |
Reactively maps all values of a reactive map by a given synchronous function. |
.mapValuesComputed |
Reactively maps all values of a reactive map by a given reactive convertion computation. |
.putIfAbsent |
Reactively adds a given key to a reactive map, if it does not already exist. |
.remove |
Reactively removes a given key from a reactive map. |
.removeWhere |
Reactively removes all entries satisfying a given synchronous condition from a reactive map. |
.removeWhereComputed |
Reactively removes all entries satisfying a given reactive condition computation from a reactive map. |
.update |
Reactively transforms a given key of a reactive map by a given synchronous function. |
.updateAll |
A special case of .mapValues where the input and output types are the same. |
.updateAllComputed |
A special case of .mapValuesComputed where the input and output types are the same. |
.groupBy |
Reactively groups a reactive map by a given synchronous function. |
.groupByComputed |
Reactively groups a reactive map by a given reactive group computation. |
.join |
Computes a reactive inner join of a pair of reactive maps. |
.lookup |
Computes a reactive left join of a pair of reactive maps. |
.cartesianProduct |
Computes a reactive cartesian product of a pair of reactive maps. |
.flat |
Reactively flattens a reactive map of reactive maps. |
An index of attributes #
Below is a list of attributes on reactive maps along with a high-level description of them.
Attribute | Description |
---|---|
.changes |
The change stream computation for this map. |
.snapshot |
The snapshot computation for this map. |
.operator[] |
A computation representing the given key of this map. |
.isEmpty |
A computation representing the emptyness of this map. |
.isNotEmpty |
Opposite of .isEmpty . |
.length |
A computation representing the length of this map. |
An index of factories #
Below is a list of ways to create ComputedMap
s.
Factory | Description |
---|---|
.fromChangeStream |
Track a given change stream. Initialized to the empty map. |
.fromSnapshotStream |
Tracks a given snapshot stream. Internally creates a change stream by diffing consecutive snapshots. |
.fromIMap |
A constant map equal to the given IMap . |
.fromChangeStreamWithPrev |
Like .fromChangeStream , but lets the change stream computation depend on the snapshot of the map. |
.fromPiecewise |
A generalization of .fromIMap that creates a computed map from a given function over a given domain of keys based on reactive dependencies. |
.fromPiecewiseComputed |
Like .fromPiecewise , but the values are defined by reactive computations. |