transform<S, T> method

  1. @override
Stream<T> transform<S, T>(
  1. Stream<S> data,
  2. Stream<T> mapper(
    1. Stream<S> e
    ), {
  3. String workerName = '',
})
override

异步执行Stream转换, mapper 在异步执行的方法,最好是顶层函数,否则可能因传入了无法序列化的对象导致失败,好在这种情况不论该对象是否有被使用都会崩溃, workerName 要支持web端真异步必须指定一个worker独立js文件在其中调用了传入的mapper, 参考/worker/worker.dart,js文件名'worker.js'对应参数workerName就是'worker',

Implementation

@override
Stream<T> transform<S, T>(
    Stream<S> data, Stream<T> Function(Stream<S> e) mapper,
    {String workerName = ''}) async* {
  if (const bool.fromEnvironment('dart.library.js_util')) {
    yield* mapper(data);
    return;
  }
  final mainReceive = ReceivePort();
  final isolate = await Isolate.spawn(_entry(mapper), mainReceive.sendPort,
      // 这里注册监听异步线程内抛出的异常,
      onError: mainReceive.sendPort);

  _cache.add(isolate);
  await for (var message in mainReceive) {
    if (message is SendPort) {
      data.listen((event) {
        // 这里无阻塞,所以数据会渊源不断的读取,不管能不能处理完,
        message.send(event);
      }, onDone: () {
        // 这里不能直接把mainReceive给close掉,以防万一异步数据还没处理完,输入的数据就已经非阻塞读取完了,
        message.send(const IsolateStreamDone());
      });
      continue;
    }
    if (message is IsolateStreamDone) {
      mainReceive.close();
      _kill(isolate);
      return;
    }
    // 这里是异步线程内抛出的异常,
    if (message is IsolateException) {
      mainReceive.close();
      _kill(isolate);
      yield* Stream.error(message.error, message.stackTrace);
      return;
    }
    yield message as T;
  }
  // unreachable,
  _kill(isolate);
}