transform<S, T> method
异步执行Stream转换,
mapper
在异步执行的方法,最好是顶层函数,否则可能因传入了无法序列化的对象导致失败,好在这种情况不论该对象是否有被使用都会崩溃,
workerName
要支持web端真异步必须指定一个worker独立js文件在其中调用了传入的mapper, 参考/worker/worker.dart,js文件名'worker.js'对应参数workerName就是'worker',
Implementation
@override
Stream<T> transform<S, T>(
Stream<S> data, Stream<T> Function(Stream<S> e) mapper,
{String workerName = ''}) async* {
if (const bool.fromEnvironment('dart.library.js_util')) {
yield* mapper(data);
return;
}
final mainReceive = ReceivePort();
final isolate = await Isolate.spawn(_entry(mapper), mainReceive.sendPort,
// 这里注册监听异步线程内抛出的异常,
onError: mainReceive.sendPort);
_cache.add(isolate);
await for (var message in mainReceive) {
if (message is SendPort) {
data.listen((event) {
// 这里无阻塞,所以数据会渊源不断的读取,不管能不能处理完,
message.send(event);
}, onDone: () {
// 这里不能直接把mainReceive给close掉,以防万一异步数据还没处理完,输入的数据就已经非阻塞读取完了,
message.send(const IsolateStreamDone());
});
continue;
}
if (message is IsolateStreamDone) {
mainReceive.close();
_kill(isolate);
return;
}
// 这里是异步线程内抛出的异常,
if (message is IsolateException) {
mainReceive.close();
_kill(isolate);
yield* Stream.error(message.error, message.stackTrace);
return;
}
yield message as T;
}
// unreachable,
_kill(isolate);
}