findPeers method

  1. @override
Future<Stream<AddrInfo>> findPeers(
  1. String ns, [
  2. List<DiscoveryOption> options = const []
])
override

Discovers peers providing a service

Implementation

@override
Future<Stream<AddrInfo>> findPeers(String ns, [List<DiscoveryOption> options = const []]) async {
  // Get options
  final opts = DiscoveryOptions().apply(options);

  // Get cached peers
  var c = _peerCache[ns];

  /*
    Overall plan:
    If it's time to look for peers, look for peers, then return them
    If it's not time then return cache
    If it's time to look for peers, but we have already started looking. Get up to speed with ongoing request
  */

  // Setup cache if we don't have one yet
  if (c == null) {
    final pc = BackoffCache(
      _stratFactory(),
      _clock,
    );

    c = _peerCache.putIfAbsent(ns, () => pc);
  }

  final timeExpired = _clock.now().isAfter(c.nextDiscover);

  // If it's not yet time to search again and no searches are in progress then return cached peers
  if (!(timeExpired || c.ongoing)) {
    var chLen = opts.limit ?? c.prevPeers.length;

    if (chLen > c.prevPeers.length) {
      chLen = c.prevPeers.length;
    }

    final controller = StreamController<AddrInfo>(sync: true);

    for (final ai in c.prevPeers.values) {
      if (controller.isClosed || (opts.limit != null && controller.hasListener && controller.sink is StreamSink<AddrInfo> && (controller.sink as dynamic).count >= opts.limit!)) {
        break;
      }
      controller.add(ai);
    }

    controller.close();
    return controller.stream;
  }

  // If a request is not already in progress setup a dispatcher for dispatching incoming peers
  if (!c.ongoing) {
    final peerStream = await _disc.findPeers(ns, options);

    c.ongoing = true;
    findPeerDispatcher(c, peerStream);
  }

  // Setup receiver channel for receiving peers from ongoing requests
  final evtController = StreamController<AddrInfo>();
  final peerController = StreamController<AddrInfo>();
  final rcvPeers = c.peers.values.toList();
  c.sendingChs[evtController] = opts.limit;

  findPeerReceiver(peerController, evtController.stream, rcvPeers);

  return peerController.stream;
}