handleSingleConnection method

Future<void> handleSingleConnection(
  1. Side thisSide
)

Add a Side with optional SocketAuthVerifier and DataTransformer

  • If socketAuthVerifier provided, wait for socket to be authenticated
  • All data from the corresponding 'far' side will be transformed by the transformer if supplied. For example: socketToSocket creates a Sides A and B, and has parameters transformAtoB and transformBtoA.

Implementation

Future<void> handleSingleConnection(final Side thisSide) async {
  if (closed) {
    throw StateError('Connector is closed');
  }
  if (thisSide.socketAuthVerifier == null) {
    thisSide.authenticated = true;
  } else {
    bool authenticated;
    Stream<Uint8List>? stream;
    try {
      (authenticated, stream) = await thisSide.socketAuthVerifier!
              (thisSide.socket)
          .timeout(authTimeout);
      thisSide.authenticated = authenticated;
      if (thisSide.authenticated) {
        thisSide.stream = stream!;
        _log('Authentication succeeded on side ${thisSide.name}');
      }
    } catch (e) {
      thisSide.authenticated = false;
      _log('Error while authenticating side ${thisSide.name} : $e',
          force: true);
    }
  }
  if (!thisSide.authenticated) {
    _log('Authentication failed on side ${thisSide.name}', force: true);
    _destroySide(thisSide);
    return;
  }

  if (thisSide.isSideA) {
    pendingA.add(thisSide);
  } else {
    pendingB.add(thisSide);
  }

  if (pendingA.isNotEmpty && pendingB.isNotEmpty) {
    Connection c = Connection(pendingA.removeAt(0), pendingB.removeAt(0));
    connections.add(c);
    _log(chalk.brightBlue(
        'Added connection. There are now ${connections.length} connections.'));

    for (final side in [thisSide, thisSide.farSide!]) {
      unawaited(side.socket.done
          .then((v) => _destroySide(side))
          .catchError((err) => _destroySide(side)));
      if (side.transformer != null) {
        // transformer is there to transform data originating FROM its side
        StreamController<Uint8List> sc = StreamController<Uint8List>();
        side.farSide!.sink = sc;
        Stream<List<int>> transformed = side.transformer!(sc.stream);
        transformed.listen((data) {
          try {
            if (side.farSide!.state == SideState.open) {
              side.farSide!.socket.add(data);
            } else {
              throw StateError(
                  'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
            }
          } catch (e, st) {
            _log('Failed to write to side ${side.farSide!.name} - closing',
                force: true);
            _log('(Error was $e; Stack trace follows\n$st');
            _destroySide(side.farSide!);
          }
        });
      }
      side.stream.listen((Uint8List data) {
        if (logTraffic) {
          final message = String.fromCharCodes(data);
          if (side.isSideA) {
            _log(chalk.brightGreen(
                'A -> B : ${message.replaceAll(RegExp('[\x00-\x1F\x7F-\xFF]'), '*')}'));
          } else {
            _log(chalk.brightRed(
                'B -> A : ${message.replaceAll(RegExp('[\x00-\x1F\x7F-\xFF]'), '*')}'));
          }
        }
        try {
          if (side.farSide!.state == SideState.open) {
            side.farSide!.sink.add(data);
          } else {
            throw StateError(
                'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
          }
        } catch (e, st) {
          _log('Failed to write to side ${side.farSide!.name} - closing',
              force: true);
          _log('(Error was $e; Stack trace follows\n$st');
          _destroySide(side.farSide!);
        }
      }, onDone: () {
        _log('stream.onDone on side ${side.name}');
        _destroySide(side);
      }, onError: (error) {
        _log('stream.onError on side ${side.name}: $error', force: true);
        _destroySide(side);
      });
    }
  }
}