listen method

Future<void> listen()

Implementation

Future<void> listen() async {
  final server = await HttpServer.bind(
    address,
    port!,
  );
  final sub = ProcessSignal.sigint.watch().listen((event) {
    server.close(force: true);
  });
  try {
    _logger.info('[SERVER]: listening at ${server.address}:${server.port}');

    await for (var req in server) {
      final commandStreamController =
          StreamController<DeviceCommand>.broadcast();
      try {
        _logger.info(
          '[CONNECTION]: connection received -- ${req.headers['host']}',
        );

        WebSocketCommunicator? comm;

        // Upgrade a HttpRequest to a WebSocket connection.
        WebSocket? socket = await WebSocketTransformer.upgrade(req);

        socket.listen(
          (event) {
            DeviceCommand? cmd;

            try {
              cmd = DeviceCommand.fromDynamic(json.decode(event));
            } catch (e, stack) {
              _logger.severe('[COMMAND]: unable to parse command.', e, stack);
            }

            if (cmd == null) {
              _logger.info('[CLOSE]: closing socket due to null command.');
              socket?.close();
              socket = null;
            } else {
              commandStreamController.add(cmd);
            }
          },
          onDone: () {
            commandStreamController.close();
            _logger.info('[CLOSE]: onDone called: [${comm?.toString()}]');

            if (comm is Driver) {
              final driver = comm;
              final inSession = comm.app.sessions.values
                      .where((session) =>
                          driver.driverId == session.driver.driverId)
                      .isNotEmpty ==
                  true;
              if (inSession != true) {
                _logger.info(
                  '[CLOSE]: no session, driver removed: [${comm.toString()}',
                );
                comm.app.drivers.remove(driver.driverId);
                comm.close();
              }
            }

            if (_onDone != null) {
              _onDone!(socket);
            }
          },
          onError: (e, stack) {
            if (comm is Driver) {
              final driver = comm;
              final inSession = comm.app.sessions.values
                      .where((session) =>
                          driver.driverId == session.driver.driverId)
                      .isNotEmpty ==
                  true;
              if (inSession != true) {
                driver.app.drivers.remove(driver.driverId);
                _logger.severe(
                  '[CLOSE]: no session, driver removed: [${driver.toString()}',
                  e,
                  stack,
                );
                driver.close();
              }
            }
          },
        );

        if (socket != null) {
          comm = await _authenticator.authenticate(
            commandStream: commandStreamController.stream,
            socket: socket!,
          );
        }

        if (comm == null) {
          throw AuthenticationException(
            '[AUTHENTICATION]: authentication failed',
          );
        }

        if (comm is Driver) {
          final driver = comm;
          final sessions = comm.app.sessions.values.where(
            (session) => session.driver.driverId == driver.driverId,
          );

          if (sessions.isNotEmpty == true) {
            final session = sessions.first;
            session.start();
          } else {
            comm.onCommandReceived = (command) async {
              final handler =
                  _customHandlers[command.type] ?? _handlers[command.type];
              if (handler != null) {
                await handler(
                  app: driver.app,
                  command: command,
                  comm: comm,
                );
              }
            };
          }
        } else if (comm is Device) {
          final device = comm;
          final sessions = comm.app.sessions.values.where(
            (session) => session.device.device.id == device.device.id,
          );

          if (sessions.isNotEmpty == true) {
            final session = sessions.first;
            session.start();
          }
        } else {
          comm.onCommandReceived = (command) async {
            final handler =
                _customHandlers[command.type] ?? _handlers[command.type];
            if (handler != null) {
              await handler(
                app: comm!.app,
                command: command,
                comm: comm,
              );
            }
          };
        }

        commandStreamController.stream.listen(
          (cmd) {
            _authorizer
                .authorize(
              command: cmd,
              communicator: comm!,
            )
                .then((authorized) {
              try {
                if (authorized) {
                  comm!.onCommandReceived(cmd);
                } else {
                  comm!.sendCommand(
                    CommandAck(
                      commandId: cmd.id,
                      message: 'UNAUTHORIZED',
                      success: false,
                    ),
                  );
                }
              } catch (e, stack) {
                _logger.severe('[SERVER]: uncaught error.', e, stack);
              }
            });
          },
        );
      } catch (e, stack) {
        _logger.severe('[SERVER]: uncaught error.', e, stack);
      }
    }
  } finally {
    await sub.cancel();
  }
}