distinct_value_connectable_stream 1.0.3+1

distinct_value_connectable_stream #

  • Distinct & Connectable & ValueStream rxdart
  • Useful for flutter BLoC pattern, expose broadcast state stream to UI, can synchronous access to the last emitted item, and distinct until changed

Author: Petrus Nguyễn Thái Học

Build Status Pub

Implement BLoC #

Without using package #

Using package #

Usage #

Import distinct_value_connectable_stream:

import 'package:distinct_value_connectable_stream/distinct_value_connectable_stream.dart';

1. Constructor based #

Wrap your Stream in a DistinctValueConnectableStream using constructor:

final Stream<State> state$;
final distinct$ = DistinctValueConnectableStream(state$);
final distinctSeeded$ = DistinctValueConnectableStream.seeded(
  state$,
  seedValue: State.initial(),
);

You can pass equals parameter type bool Function(T, T) to constructor, used to determined equality (default is operator ==):

final Stream<State> state$;
final bool Function(State, State) isEquals;

final distinct$ = DistinctValueConnectableStream.seeded(
  state$,
  seedValue: State.initial(),
  equals: isEquals,
);

2. Extension method based #

final source$ = Stream.fromIterable([1, 2, 2, 3, 3, 3]);

// publish
final connectable$       = source$.publishValueDistinct();
final connectableSeeded$ = source$.publishValueSeededDistinct(seedValue: 0);

// share
final shared$            = source$.shareValueDistinct();
final sharedSeeded$      = source$.shareValueSeededDistinct(seedValue: 0);

All extension methods have optional parameter equals type bool Function(T, T) like constructor based

final source$ = Stream.fromIterable([1, 2, 2, 3, 3, 3]);
final connectable$ = source$.publishValueDistinct();

// Does not print anything at first
connectable$.listen(print);

// Start listening to the source Stream. Will cause the previous
// line to start printing 1, 2, 3
final subscription = connectable$.connect();

// Late subscribers will receive the last emitted value
connectable$.listen(print); // Prints 3

// Can access the latest emitted value synchronously. Prints 3
print(connectable$.value);

// Stop emitting items from the source stream and close the underlying
// BehaviorSubject
await subscription.cancel();

Features and bugs #

Please file feature requests and bugs at the issue tracker.

License #

MIT License

Copyright (c) 2019 Petrus Nguyễn Thái Học

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

1.0.3+1 - Jan 14, 2020 #

  • Fix analysis

1.0.3 - Dec 15, 2019 #

  • Fix README.md

1.0.2 - Dec 15, 2019 #

  • Fix README.md

1.0.1 - Dec 15, 2019 #

  • Fix README.md

1.0.0 - Dec 15, 2019 #

  • Publish

example/distinct_value_connectable_stream_example.dart

import 'package:distinct_value_connectable_stream/distinct_value_connectable_stream.dart';
import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart';

class CounterBloc {
  /// Inputs
  final void Function(int) increment;
  final void Function(int) decrement;

  /// Outputs
  final ValueStream<int> state;

  /// Clean up
  final void Function() dispose;

  CounterBloc._({
    @required this.increment,
    @required this.decrement,
    @required this.state,
    @required this.dispose,
  });

  factory CounterBloc() {
    final incrementController = PublishSubject<int>();
    final decrementController = PublishSubject<int>();

    final streams = [
      incrementController,
      decrementController.map((i) => -i),
    ];
    final state$ = Rx.merge(streams)
        .scan<int>((acc, e, _) => acc + e, 0)
        .publishValueSeededDistinct(seedValue: 0);

    final subscription = state$.connect();

    return CounterBloc._(
      increment: incrementController.add,
      decrement: decrementController.add,
      state: state$,
      dispose: () async {
        await subscription.cancel();
        await Future.wait(
            [incrementController, decrementController].map((c) => c.close()));
      },
    );
  }
}

void main() async {
  final counterBloc = CounterBloc();

  final listen = counterBloc.state.listen((i) => print('[LOGGER] state=$i'));
  counterBloc
    ..increment(0)
    ..increment(2)
    ..decrement(2)
    ..decrement(2)
    ..decrement(2)
    ..increment(2)
    ..increment(2)
    ..increment(0)
    ..increment(0)
    ..increment(0)
    ..increment(0)
    ..increment(0);

  await Future.delayed(Duration(seconds: 1));
  print(counterBloc.state.value);

  await listen.cancel();
  await counterBloc.dispose();
}

Use this package as a library

1. Depend on it

Add this to your package's pubspec.yaml file:


dependencies:
  distinct_value_connectable_stream: ^1.0.3+1

2. Install it

You can install packages from the command line:

with pub:


$ pub get

with Flutter:


$ flutter pub get

Alternatively, your editor might support pub get or flutter pub get. Check the docs for your editor to learn more.

3. Import it

Now in your Dart code, you can use:


import 'package:distinct_value_connectable_stream/distinct_value_connectable_stream.dart';
  
Popularity:
Describes how popular the package is relative to other packages. [more]
81
Health:
Code health derived from static analysis. [more]
100
Maintenance:
Reflects how tidy and up-to-date the package is. [more]
100
Overall:
Weighted score of the above. [more]
90
Learn more about scoring.

We analyzed this package on Feb 12, 2020, and provided a score, details, and suggestions below. Analysis was completed with status completed using:

  • Dart: 2.7.1
  • pana: 0.13.5

Dependencies

Package Constraint Resolved Available
Direct dependencies
Dart SDK >=2.6.0 <3.0.0
meta ^1.1.6 1.1.8
rxdart ^0.23.1 0.23.1
Dev dependencies
mockito ^3.0.0
pedantic ^1.9.0
test ^1.0.0