findPeers method
Future<Stream<AddrInfo> >
findPeers(
- String ns, [
- List<
DiscoveryOption> options = const []
override
Discovers peers providing a service
Implementation
@override
Future<Stream<AddrInfo>> findPeers(String ns, [List<DiscoveryOption> options = const []]) async {
// Get options
final opts = DiscoveryOptions().apply(options);
// Get cached peers
var c = _peerCache[ns];
/*
Overall plan:
If it's time to look for peers, look for peers, then return them
If it's not time then return cache
If it's time to look for peers, but we have already started looking. Get up to speed with ongoing request
*/
// Setup cache if we don't have one yet
if (c == null) {
final pc = BackoffCache(
_stratFactory(),
_clock,
);
c = _peerCache.putIfAbsent(ns, () => pc);
}
final timeExpired = _clock.now().isAfter(c.nextDiscover);
// If it's not yet time to search again and no searches are in progress then return cached peers
if (!(timeExpired || c.ongoing)) {
var chLen = opts.limit ?? c.prevPeers.length;
if (chLen > c.prevPeers.length) {
chLen = c.prevPeers.length;
}
final controller = StreamController<AddrInfo>(sync: true);
for (final ai in c.prevPeers.values) {
if (controller.isClosed || (opts.limit != null && controller.hasListener && controller.sink is StreamSink<AddrInfo> && (controller.sink as dynamic).count >= opts.limit!)) {
break;
}
controller.add(ai);
}
controller.close();
return controller.stream;
}
// If a request is not already in progress setup a dispatcher for dispatching incoming peers
if (!c.ongoing) {
final peerStream = await _disc.findPeers(ns, options);
c.ongoing = true;
findPeerDispatcher(c, peerStream);
}
// Setup receiver channel for receiving peers from ongoing requests
final evtController = StreamController<AddrInfo>();
final peerController = StreamController<AddrInfo>();
final rcvPeers = c.peers.values.toList();
c.sendingChs[evtController] = opts.limit;
findPeerReceiver(peerController, evtController.stream, rcvPeers);
return peerController.stream;
}