transform<S, T> method

  1. @override
Stream<T> transform<S, T>(
  1. Stream<S> data,
  2. Stream<T> mapper(
    1. Stream<S> e
    ), {
  3. String workerName = '',
})
override

异步执行Stream转换, mapper 在异步执行的方法,最好是顶层函数,否则可能因传入了无法序列化的对象导致失败,好在这种情况不论该对象是否有被使用都会崩溃, workerName 要支持web端真异步必须指定一个worker独立js文件在其中调用了传入的mapper, 参考/worker/worker.dart,js文件名'worker.js'对应参数workerName就是'worker',

Implementation

@override
Stream<T> transform<S, T>(
    Stream<S> data, Stream<T> Function(Stream<S> e) mapper,
    {String workerName = ''}) async* {
  if (workerName.isEmpty || !Worker.supported) {
    yield* mapper(data);
    return;
  }
  final worker = Worker('$workerName.js');
  _cache.add(worker);
  data.listen(
    (event) {
      worker.postMessage(event);
    },
    onDone: () {
      worker.postMessage(const IsolateStreamDone().toJson());
    },
  );
  await for (var event in worker.onMessage) {
    final data = event.data;
    // 这里是异步线程内抛出的异常,
    if (data is Map) {
      // 魔法代码,不知道为什么是o,
      final o = data['o'];
      if (o is Map && o['type'] == 'IsolateException') {
        final exception =
            IsolateException.fromJson(Map<String, dynamic>.from(o));
        _kill(worker);
        yield* Stream.error(exception.error, exception.stackTrace);
        return;
      }
      if (o is Map && o['type'] == 'IsolateStreamDone') {
        _kill(worker);
        return;
      }
      if (o != null) {
        yield o as T;
        continue;
      }
    }
    if (data is T) {
      yield data;
    }
  }
}