serverToSocket static method
Future<SocketConnector>
serverToSocket({
- InternetAddress? addressA,
- int portA = 0,
- required InternetAddress addressB,
- required int portB,
- DataTransformer? transformAtoB,
- DataTransformer? transformBtoA,
- bool verbose = false,
- bool logTraffic = false,
- Duration timeout = SocketConnector.defaultTimeout,
- IOSink? logger,
- bool multi = false,
- @Deprecated("use beforeJoining instead") dynamic onConnect()?,
- dynamic beforeJoining()?,
- int backlog = 0,
- Creates socket to
portB
onaddressB
- Binds to
portA
onaddressA
- Listens for a socket connection on
portA
port and joins it to the 'B' side - If
portA
is not provided then a port is chosen by the OS. addressA
defaults to InternetAddress.anyIPv4multi
flag controls whether or not to allow multiple connections to the bound server portportA
onConnect
is called whenportA
has got a new connection and a corresponding outbound socket has been created toaddressB
:portB
and the two have been joined togetherbeforeJoining
is called whenportA
has got a new connection and a corresponding outbound socket has been created toaddressB
:portB
but before they are joined together. This allows the code which called serverToSocket to take additional steps (such as setting new transformers rather than the ones which were provided initially)
Implementation
static Future<SocketConnector> serverToSocket(
{
/// Defaults to [InternetAddress.anyIPv4]
InternetAddress? addressA,
int portA = 0,
required InternetAddress addressB,
required int portB,
DataTransformer? transformAtoB,
DataTransformer? transformBtoA,
bool verbose = false,
bool logTraffic = false,
Duration timeout = SocketConnector.defaultTimeout,
IOSink? logger,
bool multi = false,
@Deprecated("use beforeJoining instead")
Function(Socket socketA, Socket socketB)? onConnect,
Function(Side sideA, Side sideB)? beforeJoining,
int backlog = 0}) async {
IOSink logSink = logger ?? stderr;
addressA ??= InternetAddress.anyIPv4;
SocketConnector connector = SocketConnector(
verbose: verbose,
logTraffic: logTraffic,
timeout: timeout,
logger: logSink,
);
int connections = 0;
// bind to a local port for side 'A'
connector._serverSocketA = await ServerSocket.bind(
addressA,
portA,
backlog: backlog,
);
StreamController<Socket> ssc = StreamController();
Mutex m = Mutex();
ssc.stream.listen((sideASocket) async {
try {
// It's important we handle these in sequence with no chance for race
// So we're going to use a mutex
await m.acquire();
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA).catchError((err) {
logSink.writeln(
'ERROR $err from handleSingleConnection on sideA $sideA');
}));
if (verbose) {
logSink.writeln('Creating socket #${++connections} to the "B" side');
}
// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
if (verbose) {
logSink.writeln('"B" side socket #$connections created');
}
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
if (verbose) {
logSink.writeln('Calling the beforeJoining callback');
}
await beforeJoining?.call(sideA, sideB);
unawaited(connector.handleSingleConnection(sideB).catchError((err) {
logSink.writeln(
'ERROR $err from handleSingleConnection on sideB $sideB');
}));
onConnect?.call(sideASocket, sideBSocket);
} finally {
m.release();
}
});
// listen on the local port and connect the inbound socket
connector._serverSocketA?.listen((sideASocket) {
if (!multi) {
try {
connector._serverSocketA?.close();
} catch (e) {
logSink.writeln('Error while closing serverSocketA: $e');
}
}
ssc.add(sideASocket);
});
return (connector);
}