parseToSqlite static method

Stream<ParseEvent> parseToSqlite({
  1. required String xmlPath,
  2. required String dbPath,
  3. String language = '',
  4. int batchSize = 200,
  5. ParseWorkerSpawner? workerSpawner,
})

Parse an appstream XML file and stream to SQLite.

Returns a Stream of ParseEvents:

The C++ parser runs on a helper isolate so the main isolate remains responsive.

Parameters:

  • xmlPath: Path to the appstream.xml file.
  • dbPath: Path for the output SQLite database.
  • language: Language filter (e.g. 'en'), or empty for all.
  • batchSize: Components per SQLite transaction batch.
  • workerSpawner: Optional isolate spawner override (useful for tests).

Implementation

static Stream<ParseEvent> parseToSqlite({
  required String xmlPath,
  required String dbPath,
  String language = '',
  int batchSize = 200,
  ParseWorkerSpawner? workerSpawner,
}) {
  _ensureInitialized();

  final controller = StreamController<ParseEvent>();
  final receivePort = RawReceivePort();
  final workerResultPort = ReceivePort();
  Isolate? workerIsolate;
  var closed = false;

  void closeAll() {
    if (closed) return;
    closed = true;
    receivePort.close();
    workerResultPort.close();
    if (!controller.isClosed) {
      controller.close();
    }
    workerIsolate?.kill(priority: Isolate.immediate);
    workerIsolate = null;
  }

  controller.onCancel = closeAll;

  receivePort.handler = (dynamic message) {
    if (message is Uint8List) {
      // Decode the tab-delimited string from C++
      final str = String.fromCharCodes(message);
      final parts = str.split('\t');

      if (parts.isNotEmpty && parts[0] == 'DONE') {
        final count = parts.length > 1 ? int.tryParse(parts[1]) ?? 0 : 0;
        controller.add(ParseDone(count));
        closeAll();
      } else if (parts.isNotEmpty && parts[0] == 'ERROR') {
        final msg = parts.length > 1 ? parts[1] : 'Unknown error';
        controller.add(ParseFailed('Native parser error: $msg'));
        closeAll();
      } else if (parts.length >= 2) {
        controller.add(
          ComponentParsed(
            ComponentEvent(
              id: parts[0],
              name: parts.length > 1 ? parts[1] : '',
              summary: parts.length > 2 ? parts[2] : '',
            ),
          ),
        );
      }
    }
  };

  // Run the blocking FFI call on a helper isolate.
  // Dart_PostCObject_DL is thread-safe, so the C++ code can post
  // messages to our ReceivePort from the isolate's thread.
  final nativePort = receivePort.sendPort.nativePort;
  final args = <String, Object>{
    'xmlPath': xmlPath,
    'dbPath': dbPath,
    'language': language,
    'nativePort': nativePort,
    'batchSize': batchSize,
    'resultPort': workerResultPort.sendPort,
  };

  workerResultPort.listen((dynamic message) {
    if (message is Map) {
      final type = message['type'];
      if (type == 'ERROR') {
        final phase = message['phase'] ?? 'unknown';
        final err = message['error'] ?? 'unknown failure';
        controller.add(ParseFailed('Isolate $phase error: $err'));
        closeAll();
      }
    } else if (message is String && message.startsWith('ERROR:')) {
      final err = message.substring('ERROR:'.length);
      controller.add(ParseFailed('Isolate worker error: $err'));
      closeAll();
    }
  });

  final spawn =
      workerSpawner ??
      (
        void Function(Map<String, Object>) entryPoint,
        Map<String, Object> workerArgs,
      ) => Isolate.spawn<Map<String, Object>>(entryPoint, workerArgs);

  Future<void>(() async {
    try {
      workerIsolate = await spawn(_parseToSqliteWorker, args);
    } catch (_) {
      // Fallback path for isolate startup failures.
      _parseToSqliteWorker(args);
    }
  }).catchError((e) {
    controller.add(ParseFailed('Isolate startup error: $e'));
    closeAll();
  });

  return controller.stream;
}