serverToSocket static method

Future<SocketConnector> serverToSocket({
  1. InternetAddress? addressA,
  2. int portA = 0,
  3. required InternetAddress addressB,
  4. required int portB,
  5. DataTransformer? transformAtoB,
  6. DataTransformer? transformBtoA,
  7. bool verbose = false,
  8. bool logTraffic = false,
  9. Duration timeout = SocketConnector.defaultTimeout,
  10. IOSink? logger,
  11. bool multi = false,
  12. @Deprecated("use beforeJoining instead") dynamic onConnect(
    1. Socket socketA,
    2. Socket socketB
    )?,
  13. dynamic beforeJoining(
    1. Side sideA,
    2. Side sideB
    )?,
  14. int backlog = 0,
})
  • Creates socket to portB on addressB
  • Binds to portA on addressA
  • Listens for a socket connection on portA port and joins it to the 'B' side
  • If portA is not provided then a port is chosen by the OS.
  • addressA defaults to InternetAddress.anyIPv4
  • multi flag controls whether or not to allow multiple connections to the bound server port portA
  • onConnect is called when portA has got a new connection and a corresponding outbound socket has been created to addressB:portB and the two have been joined together
  • beforeJoining is called when portA has got a new connection and a corresponding outbound socket has been created to addressB:portB but before they are joined together. This allows the code which called serverToSocket to take additional steps (such as setting new transformers rather than the ones which were provided initially)

Implementation

static Future<SocketConnector> serverToSocket(
    {
    /// Defaults to [InternetAddress.anyIPv4]
    InternetAddress? addressA,
    int portA = 0,
    required InternetAddress addressB,
    required int portB,
    DataTransformer? transformAtoB,
    DataTransformer? transformBtoA,
    bool verbose = false,
    bool logTraffic = false,
    Duration timeout = SocketConnector.defaultTimeout,
    IOSink? logger,
    bool multi = false,
    @Deprecated("use beforeJoining instead")
    Function(Socket socketA, Socket socketB)? onConnect,
    Function(Side sideA, Side sideB)? beforeJoining,
    int backlog = 0}) async {
  IOSink logSink = logger ?? stderr;
  addressA ??= InternetAddress.anyIPv4;

  SocketConnector connector = SocketConnector(
    verbose: verbose,
    logTraffic: logTraffic,
    timeout: timeout,
    logger: logSink,
  );

  int connections = 0;
  // bind to a local port for side 'A'
  connector._serverSocketA = await ServerSocket.bind(
    addressA,
    portA,
    backlog: backlog,
  );

  StreamController<Socket> ssc = StreamController();
  Mutex m = Mutex();
  ssc.stream.listen((sideASocket) async {
    try {
      // It's important we handle these in sequence with no chance for race
      // So we're going to use a mutex
      await m.acquire();
      Side sideA = Side(sideASocket, true, transformer: transformAtoB);
      unawaited(connector.handleSingleConnection(sideA).catchError((err) {
        logSink.writeln(
            'ERROR $err from handleSingleConnection on sideA $sideA');
      }));

      if (verbose) {
        logSink.writeln('Creating socket #${++connections} to the "B" side');
      }
      // connect to the side 'B' address and port
      Socket sideBSocket = await Socket.connect(addressB, portB);
      if (verbose) {
        logSink.writeln('"B" side socket #$connections created');
      }
      Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
      if (verbose) {
        logSink.writeln('Calling the beforeJoining callback');
      }
      await beforeJoining?.call(sideA, sideB);
      unawaited(connector.handleSingleConnection(sideB).catchError((err) {
        logSink.writeln(
            'ERROR $err from handleSingleConnection on sideB $sideB');
      }));

      onConnect?.call(sideASocket, sideBSocket);
    } finally {
      m.release();
    }
  });

  // listen on the local port and connect the inbound socket
  connector._serverSocketA?.listen((sideASocket) {
    if (!multi) {
      try {
        connector._serverSocketA?.close();
      } catch (e) {
        logSink.writeln('Error while closing serverSocketA: $e');
      }
    }
    ssc.add(sideASocket);
  });

  return (connector);
}