findPeers method

  1. @override
Future<Stream<AddrInfo>> findPeers(
  1. String ns, [
  2. List<DiscoveryOption> options = const []
])
override

Discovers peers providing a service

Implementation

@override
Future<Stream<AddrInfo>> findPeers(String ns, [List<DiscoveryOption> options = const []]) async {
  final opts = DiscoveryOptions().apply(options);
  final limit = opts.limit ?? 100; // default limit if not specified in options

  final cid = await nsToCid(ns);

  // Create a stream controller to handle the timeout
  final controller = StreamController<AddrInfo>();

  // Use a timeout for the initial provider lookup
  Future<void> findProvidersWithTimeout() async {
    try {
      // Start the provider lookup
      final providerStream = _router.findProvidersAsync(cid, limit);

      // Set up a timeout for the initial lookup
      final timeout = const Duration(seconds: 60);
      final timer = Timer(timeout, () {
        // If we reach the timeout, we'll close the controller
        // but we won't consider it an error - we just stop looking for more providers
        if (!controller.isClosed) {
          developer.log(
            'Timeout while finding providers in RoutingDiscovery.findPeers',
            name: 'dart_libp2p.discovery.routing',
            time: DateTime.now(),
          );
          controller.close();
        }
      });

      // Pipe the provider stream to our controller
      await for (final peer in providerStream) {
        if (controller.isClosed) break;
        controller.add(peer);
      }

      // Cancel the timer if we're done before the timeout
      timer.cancel();

      // Close the controller if it's not already closed
      if (!controller.isClosed) {
        await controller.close();
      }
    } catch (e) {
      // Log the error for monitoring and debugging
      developer.log(
        'Error in RoutingDiscovery.findPeers',
        name: 'dart_libp2p.discovery.routing',
        error: e,
        time: DateTime.now(),
      );

      if (!controller.isClosed) {
        controller.addError(e);
        await controller.close();
      }
    }
  }

  // Start the provider lookup process
  findProvidersWithTimeout();

  return controller.stream;
}