Reactive Dart #

Reactive Extensions Library for Dart providing an API for asynchronous programming with observable streams.

Reactive is created in the style of ReactiveX, and does not depend on Dart Streams. At this point this code is quite experimental, and might miss features or have serious bugs.

This library is open source and well tested. Development happens on GitHub. Feel free to report issues or create a pull-request there. General questions are best asked on StackOverflow.

The package is hosted on dart packages. Up-to-date class documentation is created with every release.

Currently the Dart programming language (or my inability to use it correctly) is blocking a more pleasant API in a few places:

  • The lack of extension methods in Dart makes it awkward to access operator functions and constructor methods: dart-lang/language#40, dart-lang/language#41, dart-lang/language#42, dart-lang/language#177, dart-lang/language#309, dart-lang/language#8547.
  • For some reason type inference over chained operators does not work. For example observable.pipe2(filter(()) => ...), map(() => ...)) is unable to correctly infer the types in the second operator, while observable.pipe(filter() => ...).pipe(map(() => ...) does.
  • The lack of variable length arguments and generics requires duplicated functions and typedefs with a number suffix (see above).

License #

The MIT License, see LICENSE.

0.0.1 #

  • Initial version.

0.0.2 #

  • Operator and composition basics.

0.0.3 #

  • Subject and multicast basics.


library rx.example.example;

import 'package:more/collection.dart';
import 'package:rx/operators.dart' as ops;
import 'package:rx/rx.dart' as rx;

rx.Observer<T> printObserver<T>(String name) => rx.Observer(
      next: (value) => print('$name.next($value)'),
      error: (error, [stackTrace]) => print('$name.error($error)'),
      complete: () => print('$name.complete()'),

void main() {
  // create
  final create = rx.create((subscriber) {
    for (var i = 0; i < 3; i++) {

  // empty
  final empty = rx.empty();

  // future
  final fromFuture = rx.fromFuture(Future.value(42));

  final toFuture = rx.toFuture(rx.fromIterable([1, 2, 3]));
  toFuture.then((value) => print('toFuture.then($value)'));

  // just
  final just = rx.just(42);

  // never
  final never = rx.never();

  // stream
  final fromStream = rx.fromStream(Stream.fromIterable([1, 2, 3]));

  final toStream = rx.toStream(rx.fromIterable([1, 2, 3]));
  toStream.listen((value) => print('toStream.value($value)'));

  // throw
  final throwError = rx.throwError(Exception('Hello World'));

  // double subscription
  final transformed = rx
      .fromIterable(IntegerRange(0, 100))
      .pipe(ops.filter((value) => value.isEven))
      .pipe(ops.map((value) => '{value * value}'))
      .pipe(ops.filter((value) => value.length < 3));

  // subject subscription
  final subject = rx
      .fromIterable(IntegerRange(0, 100, 25))

  // timer
  final obs = rx.timer(
      delay: const Duration(seconds: 2),
      period: const Duration(milliseconds: 500));
  final subs1 = obs.subscribe(printObserver('first'));
  final subs2 = obs.subscribe(printObserver('second'));
      .timer(delay: const Duration(seconds: 3))
      .subscribe(rx.Observer(complete: () => subs1.unsubscribe()));
      .timer(delay: const Duration(seconds: 5))
      .subscribe(rx.Observer(complete: () => subs2.unsubscribe()));

Use this package as a library

1. Depend on it

Add this to your package's pubspec.yaml file:

  rx: ^0.0.3

2. Install it

You can install packages from the command line:

with pub:

$ pub get

with Flutter:

$ flutter pub get

Alternatively, your editor might support pub get or flutter pub get. Check the docs for your editor to learn more.

3. Import it

Now in your Dart code, you can use:

import 'package:rx/rx.dart';
