bind method
Transforms the byte array input stream to generate a new stream which contains a single HashDigest
The expected behavior of this method is described below:
- When the returned stream has a subscriber (calling Stream.listen),
the message-digest generation begins from the input
stream
. - If the returned stream is paused, the processing of the input
stream
is also paused, and on resume, it continue processing from where it was left off. - If the returned stream is cancelled, the subscription to the input
stream
is also cancelled. - When the input
stream
is closed, the returned stream also gets closed with a HashDigest data. The returned stream may produce only one such data event in its life-time. - On error reading the input
stream
, or while processing the message digest, the subscription to inputstream
cancels immediately and the returned stream closes with an error event.
Implementation
@override
Stream<HashDigest> bind(Stream<List<int>> stream) {
bool paused = false;
bool cancelled = false;
StreamSubscription<List<int>>? subscription;
final controller = StreamController<HashDigest>(sync: false);
controller.onCancel = () async {
cancelled = true;
await subscription?.cancel();
};
controller.onPause = () {
paused = true;
subscription?.pause();
};
controller.onResume = () {
paused = false;
subscription?.resume();
};
controller.onListen = () {
if (cancelled) return;
bool hasError = false;
final sink = createSink();
subscription = stream.listen(
(List<int> event) {
try {
sink.add(event);
} catch (err, stack) {
hasError = true;
subscription?.cancel();
controller.addError(err, stack);
}
},
cancelOnError: true,
onError: (Object err, [StackTrace? stack]) {
hasError = true;
controller.addError(err, stack);
},
onDone: () {
try {
if (!hasError) {
controller.add(sink.digest());
}
} catch (err, stack) {
controller.addError(err, stack);
} finally {
controller.close();
}
},
);
if (paused) {
subscription?.pause();
}
};
return controller.stream;
}