@override Stream<T> asStream() { StreamController<T> controller = BehaviorSubject(); this._streams.add(controller); return controller.stream; }