"changes" is not a great name since, by contract, the stream includes the initial, not yet changed value. maybe rename it to "distinctValues()" ?
@override Stream<T> changes() => fv.changes();