channel<T> static method

MpscChannel<T> channel<T>({
  1. int? capacity,
  2. DropPolicy policy = DropPolicy.block,
  3. OnDrop<T>? onDrop,
  4. bool chunked = true,
  5. String? metricsId,
})

Implementation

static MpscChannel<T> channel<T>({
  int? capacity,
  DropPolicy policy = DropPolicy.block,
  OnDrop<T>? onDrop,
  bool chunked = true,
  String? metricsId,
}) {
  final inner = capacity == null
      ? chunked
          ? ChunkedBuffer<T>()
          : UnboundedBuffer<T>()
      : (capacity == 0)
          ? RendezvousBuffer<T>()
          : BoundedBuffer<T>(capacity: capacity);
  final bool usePolicy =
      capacity != null && capacity > 0 && policy != DropPolicy.block;
  final ChannelBuffer<T> buf = usePolicy
      ? PolicyBufferWrapper<T>(inner, policy: policy, onDrop: onDrop)
      : inner;
  final core = StandardChannelCore<T>(
    buf,
    allowMultiSenders: true,
    allowMultiReceivers: false,
    metricsId: metricsId,
  );
  final tx = core.attachSender((c) =>
      MpscSender<T>._(c.id, c.createRemotePort(), metricsId: c.metricsId));
  final rx = core.attachReceiver((c) =>
      MpscReceiver<T>._(c.id, c.createRemotePort(), metricsId: c.metricsId));
  return (tx, rx);
}