handleIdentifyResponse method
Implementation
Future<void> handleIdentifyResponse(P2PStream stream, bool isPush) async {
final peer = stream.conn.remotePeer;
final String side = stream.conn.localPeer == host.id ? "CLIENT" : "SERVER";
final handleStart = DateTime.now();
try {
await stream.scope().setService(serviceName);
final serviceScopeTime = DateTime.now().difference(handleStart);
} catch (e) {
_log.warning(' [HANDLE-IDENTIFY-RESPONSE-PHASE-1-ERROR] ($side) Error attaching stream to identify service for peer=$peer: $e. Resetting stream.');
await stream.reset();
throw e;
}
try {
await stream.scope().reserveMemory(signedIDSize, ReservationPriority.always);
final memoryReserveTime = DateTime.now().difference(handleStart);
} catch (e) {
_log.warning(' [HANDLE-IDENTIFY-RESPONSE-PHASE-2-ERROR] ($side) Error reserving memory for identify stream for peer=$peer: $e. Resetting stream.');
await stream.reset();
throw e;
}
try {
final conn = stream.conn;
final mes = await _readAllIDMessages(stream);
final readMessagesTime = DateTime.now().difference(handleStart);
await _consumeMessage(mes, conn, isPush);
final consumeMessageTime = DateTime.now().difference(handleStart);
if (metricsTracer != null) {
metricsTracer!.identifyReceived(isPush, mes.protocols.length, mes.listenAddrs.length);
}
await _connsMutex.synchronized( () async {
final e = _conns[conn];
if (e == null) {
_log.fine('IdentifyService.handleIdentifyResponse ($side): Connection entry for $peer already removed (disconnected). Cannot update push support.');
return;
}
_log.finer('IdentifyService.handleIdentifyResponse ($side): Checking push support for $peer in peerstore.');
final sup = await host.peerStore.protoBook.supportsProtocols(conn.remotePeer, [idPush]);
if (sup.isNotEmpty) {
e.pushSupport = IdentifyPushSupport.supported;
_log.fine('IdentifyService.handleIdentifyResponse ($side): Peer $peer supports push.');
} else {
e.pushSupport = IdentifyPushSupport.unsupported;
_log.fine('IdentifyService.handleIdentifyResponse ($side): Peer $peer does not support push.');
}
if (metricsTracer != null) {
metricsTracer!.connPushSupport(e.pushSupport);
}
});
final pushSupportTime = DateTime.now().difference(handleStart);
await stream.closeWrite();
final totalTime = DateTime.now().difference(handleStart);
} catch (e, st) {
final errorTime = DateTime.now().difference(handleStart);
_log.severe(' [HANDLE-IDENTIFY-RESPONSE-ERROR] ($side) Error reading or processing identify message from peer=$peer, duration=${errorTime.inMilliseconds}ms, error=$e\n$st');
await stream.reset();
rethrow;
} finally {
stream.scope().releaseMemory(signedIDSize);
}
}