handleSingleConnection method
Add a Side with optional SocketAuthVerifier and DataTransformer
- If
socketAuthVerifier
provided, wait for socket to be authenticated - All data from the corresponding 'far' side will be transformed by the
transformer
if supplied. For example: socketToSocket creates a Sides A and B, and has parameterstransformAtoB
andtransformBtoA
.
Implementation
Future<void> handleSingleConnection(final Side thisSide) async {
if (closed) {
throw StateError('Connector is closed');
}
unawaited(thisSide.socket.done
.then((v) => _closeSide(thisSide))
.catchError((err) => _closeSide(thisSide)));
if (thisSide.socketAuthVerifier == null) {
thisSide.authenticated = true;
} else {
bool authenticated;
Stream<Uint8List>? stream;
try {
(authenticated, stream) = await thisSide.socketAuthVerifier!
(thisSide.socket)
.timeout(authTimeout);
thisSide.authenticated = authenticated;
if (thisSide.authenticated) {
thisSide.stream = stream!;
_log('Authentication succeeded on side ${thisSide.name}');
}
} catch (e) {
thisSide.authenticated = false;
_log('Error while authenticating side ${thisSide.name} : $e',
force: true);
}
}
if (!thisSide.authenticated) {
_log('Authentication failed on side ${thisSide.name}', force: true);
_closeSide(thisSide);
return;
}
if (thisSide.isSideA) {
pendingA.add(thisSide);
} else {
pendingB.add(thisSide);
}
if (pendingA.isNotEmpty && pendingB.isNotEmpty) {
Connection c = Connection(pendingA.removeAt(0), pendingB.removeAt(0));
connections.add(c);
_log(chalk.brightBlue(
'Added connection. There are now ${connections.length} connections.'));
for (final side in [thisSide, thisSide.farSide!]) {
if (side.transformer != null) {
// transformer is there to transform data originating FROM its side
// transformer's output will write to the SOCKET on the far side
StreamController<Uint8List> sc = StreamController<Uint8List>();
side.farSide!.sink = sc;
Stream<List<int>> transformed = side.transformer!(sc.stream);
transformed.listen(
(data) {
try {
side.farSide!.socket.add(data);
side.farSide!.sent += data.length;
if (side.state == SideState.closed &&
side.rcvd == side.farSide!.sent) {
_closeSide(side.farSide!);
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st', force: true);
_closeSide(side.farSide!);
}
},
onDone: () => _closeSide(side),
onError: (error) => _closeSide(side),
);
}
side.stream.listen((Uint8List data) {
side.rcvd += data.length;
if (logTraffic) {
final message = String.fromCharCodes(data);
if (side.isSideA) {
_log(chalk.brightGreen(
'A -> B : ${message.replaceAll(RegExp('[\x00-\x1F\x7F-\xFF]'), '*')}'));
} else {
_log(chalk.brightRed(
'B -> A : ${message.replaceAll(RegExp('[\x00-\x1F\x7F-\xFF]'), '*')}'));
}
}
try {
side.farSide!.sink.add(data);
if (side.farSide!.sink is Socket) {
side.farSide!.sent += data.length;
if (side.state == SideState.closed &&
side.rcvd == side.farSide!.sent) {
_closeSide(side.farSide!);
}
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st', force: true);
_closeSide(side.farSide!);
}
}, onDone: () async {
_log('${side.stream.runtimeType}.onDone on side ${side.name}');
_closeSide(side);
}, onError: (error) {
_log(
'${side.stream.runtimeType}.onError on side ${side.name}: $error',
force: true);
_closeSide(side);
});
}
}
}