handleSingleConnection method

Future<void> handleSingleConnection(
  1. Side thisSide
)

Add a Side with optional SocketAuthVerifier and DataTransformer

  • If socketAuthVerifier provided, wait for socket to be authenticated
  • All data from the corresponding 'far' side will be transformed by the transformer if supplied. For example: socketToSocket creates a Sides A and B, and has parameters transformAtoB and transformBtoA.

Implementation

Future<void> handleSingleConnection(final Side thisSide) async {
  if (closed) {
    throw StateError('Connector is closed');
  }
  unawaited(thisSide.socket.done
      .then((v) => _closeSide(thisSide))
      .catchError((err) => _closeSide(thisSide)));
  if (thisSide.socketAuthVerifier == null) {
    thisSide.authenticated = true;
  } else {
    bool authenticated;
    Stream<Uint8List>? stream;
    try {
      (authenticated, stream) = await thisSide.socketAuthVerifier!
              (thisSide.socket)
          .timeout(authTimeout);
      thisSide.authenticated = authenticated;
      if (thisSide.authenticated) {
        thisSide.stream = stream!;
        _log('Authentication succeeded on side ${thisSide.name}');
      }
    } catch (e) {
      thisSide.authenticated = false;
      _log('Error while authenticating side ${thisSide.name} : $e',
          force: true);
    }
  }
  if (!thisSide.authenticated) {
    _log('Authentication failed on side ${thisSide.name}', force: true);
    _closeSide(thisSide);
    return;
  }

  if (thisSide.isSideA) {
    pendingA.add(thisSide);
  } else {
    pendingB.add(thisSide);
  }

  if (pendingA.isNotEmpty && pendingB.isNotEmpty) {
    Connection c = Connection(pendingA.removeAt(0), pendingB.removeAt(0));
    connections.add(c);
    _log(chalk.brightBlue(
        'Added connection. There are now ${connections.length} connections.'));

    for (final side in [thisSide, thisSide.farSide!]) {
      if (side.transformer != null) {
        // transformer is there to transform data originating FROM its side
        // transformer's output will write to the SOCKET on the far side
        StreamController<Uint8List> sc = StreamController<Uint8List>();
        side.farSide!.sink = sc;
        Stream<List<int>> transformed = side.transformer!(sc.stream);
        transformed.listen(
          (data) {
            try {
              side.farSide!.socket.add(data);
              side.farSide!.sent += data.length;
              if (side.state == SideState.closed &&
                  side.rcvd == side.farSide!.sent) {
                _closeSide(side.farSide!);
              }
            } catch (e, st) {
              _log('Failed to write to side ${side.farSide!.name} - closing',
                  force: true);
              _log('(Error was $e; Stack trace follows\n$st', force: true);
              _closeSide(side.farSide!);
            }
          },
          onDone: () => _closeSide(side),
          onError: (error) => _closeSide(side),
        );
      }
      side.stream.listen((Uint8List data) {
        side.rcvd += data.length;
        if (logTraffic) {
          final message = String.fromCharCodes(data);
          if (side.isSideA) {
            _log(chalk.brightGreen(
                'A -> B : ${message.replaceAll(RegExp('[\x00-\x1F\x7F-\xFF]'), '*')}'));
          } else {
            _log(chalk.brightRed(
                'B -> A : ${message.replaceAll(RegExp('[\x00-\x1F\x7F-\xFF]'), '*')}'));
          }
        }
        try {
          side.farSide!.sink.add(data);
          if (side.farSide!.sink is Socket) {
            side.farSide!.sent += data.length;
            if (side.state == SideState.closed &&
                side.rcvd == side.farSide!.sent) {
              _closeSide(side.farSide!);
            }
          }
        } catch (e, st) {
          _log('Failed to write to side ${side.farSide!.name} - closing',
              force: true);
          _log('(Error was $e; Stack trace follows\n$st', force: true);
          _closeSide(side.farSide!);
        }
      }, onDone: () async {
        _log('${side.stream.runtimeType}.onDone on side ${side.name}');
        _closeSide(side);
      }, onError: (error) {
        _log(
            '${side.stream.runtimeType}.onError on side ${side.name}: $error',
            force: true);
        _closeSide(side);
      });
    }
  }
}