findPeers method
Future<Stream<AddrInfo> >
findPeers(
- String ns, [
- List<
DiscoveryOption> options = const []
override
Discovers peers providing a service
Implementation
@override
Future<Stream<AddrInfo>> findPeers(String ns, [List<DiscoveryOption> options = const []]) async {
final opts = DiscoveryOptions().apply(options);
final limit = opts.limit ?? 100; // default limit if not specified in options
final cid = await nsToCid(ns);
// Create a stream controller to handle the timeout
final controller = StreamController<AddrInfo>();
// Use a timeout for the initial provider lookup
Future<void> findProvidersWithTimeout() async {
try {
// Start the provider lookup
final providerStream = _router.findProvidersAsync(cid, limit);
// Set up a timeout for the initial lookup
final timeout = const Duration(seconds: 60);
final timer = Timer(timeout, () {
// If we reach the timeout, we'll close the controller
// but we won't consider it an error - we just stop looking for more providers
if (!controller.isClosed) {
developer.log(
'Timeout while finding providers in RoutingDiscovery.findPeers',
name: 'dart_libp2p.discovery.routing',
time: DateTime.now(),
);
controller.close();
}
});
// Pipe the provider stream to our controller
await for (final peer in providerStream) {
if (controller.isClosed) break;
controller.add(peer);
}
// Cancel the timer if we're done before the timeout
timer.cancel();
// Close the controller if it's not already closed
if (!controller.isClosed) {
await controller.close();
}
} catch (e) {
// Log the error for monitoring and debugging
developer.log(
'Error in RoutingDiscovery.findPeers',
name: 'dart_libp2p.discovery.routing',
error: e,
time: DateTime.now(),
);
if (!controller.isClosed) {
controller.addError(e);
await controller.close();
}
}
}
// Start the provider lookup process
findProvidersWithTimeout();
return controller.stream;
}