parseToSqlite static method
Stream<ParseEvent>
parseToSqlite({
- required String xmlPath,
- required String dbPath,
- String language = '',
- int batchSize = 200,
- ParseWorkerSpawner? workerSpawner,
Parse an appstream XML file and stream to SQLite.
Returns a Stream of ParseEvents:
- ComponentParsed for each component written to the database
- ParseDone when parsing completes successfully
- ParseFailed on error
The C++ parser runs on a helper isolate so the main isolate remains responsive.
Parameters:
xmlPath: Path to the appstream.xml file.dbPath: Path for the output SQLite database.language: Language filter (e.g. 'en'), or empty for all.batchSize: Components per SQLite transaction batch.workerSpawner: Optional isolate spawner override (useful for tests).
Implementation
static Stream<ParseEvent> parseToSqlite({
required String xmlPath,
required String dbPath,
String language = '',
int batchSize = 200,
ParseWorkerSpawner? workerSpawner,
}) {
_ensureInitialized();
final controller = StreamController<ParseEvent>();
final receivePort = RawReceivePort();
final workerResultPort = ReceivePort();
Isolate? workerIsolate;
var closed = false;
void closeAll() {
if (closed) return;
closed = true;
receivePort.close();
workerResultPort.close();
if (!controller.isClosed) {
controller.close();
}
workerIsolate?.kill(priority: Isolate.immediate);
workerIsolate = null;
}
controller.onCancel = closeAll;
receivePort.handler = (dynamic message) {
if (message is Uint8List) {
// Decode the tab-delimited string from C++
final str = String.fromCharCodes(message);
final parts = str.split('\t');
if (parts.isNotEmpty && parts[0] == 'DONE') {
final count = parts.length > 1 ? int.tryParse(parts[1]) ?? 0 : 0;
controller.add(ParseDone(count));
closeAll();
} else if (parts.isNotEmpty && parts[0] == 'ERROR') {
final msg = parts.length > 1 ? parts[1] : 'Unknown error';
controller.add(ParseFailed('Native parser error: $msg'));
closeAll();
} else if (parts.length >= 2) {
controller.add(
ComponentParsed(
ComponentEvent(
id: parts[0],
name: parts.length > 1 ? parts[1] : '',
summary: parts.length > 2 ? parts[2] : '',
),
),
);
}
}
};
// Run the blocking FFI call on a helper isolate.
// Dart_PostCObject_DL is thread-safe, so the C++ code can post
// messages to our ReceivePort from the isolate's thread.
final nativePort = receivePort.sendPort.nativePort;
final args = <String, Object>{
'xmlPath': xmlPath,
'dbPath': dbPath,
'language': language,
'nativePort': nativePort,
'batchSize': batchSize,
'resultPort': workerResultPort.sendPort,
};
workerResultPort.listen((dynamic message) {
if (message is Map) {
final type = message['type'];
if (type == 'ERROR') {
final phase = message['phase'] ?? 'unknown';
final err = message['error'] ?? 'unknown failure';
controller.add(ParseFailed('Isolate $phase error: $err'));
closeAll();
}
} else if (message is String && message.startsWith('ERROR:')) {
final err = message.substring('ERROR:'.length);
controller.add(ParseFailed('Isolate worker error: $err'));
closeAll();
}
});
final spawn =
workerSpawner ??
(
void Function(Map<String, Object>) entryPoint,
Map<String, Object> workerArgs,
) => Isolate.spawn<Map<String, Object>>(entryPoint, workerArgs);
Future<void>(() async {
try {
workerIsolate = await spawn(_parseToSqliteWorker, args);
} catch (_) {
// Fallback path for isolate startup failures.
_parseToSqliteWorker(args);
}
}).catchError((e) {
controller.add(ParseFailed('Isolate startup error: $e'));
closeAll();
});
return controller.stream;
}