handleSingleConnection method
Add a Side with optional SocketAuthVerifier and DataTransformer
- If
socketAuthVerifier
provided, wait for socket to be authenticated - All data from the corresponding 'far' side will be transformed by the
transformer
if supplied. For example: socketToSocket creates a Sides A and B, and has parameterstransformAtoB
andtransformBtoA
.
Implementation
Future<void> handleSingleConnection(final Side thisSide) async {
if (closed) {
throw StateError('Connector is closed');
}
if (thisSide.socketAuthVerifier == null) {
thisSide.authenticated = true;
} else {
bool authenticated;
Stream<Uint8List>? stream;
try {
(authenticated, stream) = await thisSide.socketAuthVerifier!
(thisSide.socket)
.timeout(authTimeout);
thisSide.authenticated = authenticated;
if (thisSide.authenticated) {
thisSide.stream = stream!;
_log('Authentication succeeded on side ${thisSide.name}');
}
} catch (e) {
thisSide.authenticated = false;
_log('Error while authenticating side ${thisSide.name} : $e',
force: true);
}
}
if (!thisSide.authenticated) {
_log('Authentication failed on side ${thisSide.name}', force: true);
_destroySide(thisSide);
return;
}
if (thisSide.isSideA) {
pendingA.add(thisSide);
} else {
pendingB.add(thisSide);
}
if (pendingA.isNotEmpty && pendingB.isNotEmpty) {
Connection c = Connection(pendingA.removeAt(0), pendingB.removeAt(0));
connections.add(c);
_log(chalk.brightBlue(
'Added connection. There are now ${connections.length} connections.'));
for (final side in [thisSide, thisSide.farSide!]) {
unawaited(side.socket.done
.then((v) => _destroySide(side))
.catchError((err) => _destroySide(side)));
if (side.transformer != null) {
// transformer is there to transform data originating FROM its side
StreamController<Uint8List> sc = StreamController<Uint8List>();
side.farSide!.sink = sc;
Stream<List<int>> transformed = side.transformer!(sc.stream);
transformed.listen((data) {
try {
if (side.farSide!.state == SideState.open) {
side.farSide!.socket.add(data);
} else {
throw StateError(
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st');
_destroySide(side.farSide!);
}
});
}
side.stream.listen((Uint8List data) {
if (logTraffic) {
final message = String.fromCharCodes(data);
if (side.isSideA) {
_log(chalk.brightGreen(
'A -> B : ${message.replaceAll(RegExp('[\x00-\x1F\x7F-\xFF]'), '*')}'));
} else {
_log(chalk.brightRed(
'B -> A : ${message.replaceAll(RegExp('[\x00-\x1F\x7F-\xFF]'), '*')}'));
}
}
try {
if (side.farSide!.state == SideState.open) {
side.farSide!.sink.add(data);
} else {
throw StateError(
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st');
_destroySide(side.farSide!);
}
}, onDone: () {
_log('stream.onDone on side ${side.name}');
_destroySide(side);
}, onError: (error) {
_log('stream.onError on side ${side.name}: $error', force: true);
_destroySide(side);
});
}
}
}