readBatch method
Read a batch of events for flushing. Priority events come first. Memory queue events are returned with negative IDs (-(seq+1)) to distinguish them from SQLite rows, enabling correct deletion routing.
Implementation
Future<List<EventRow>> readBatch(int limit) async {
return await _lock.synchronized(() async {
final List<EventRow> result = [];
if (_dbAvailable && _pendingSqliteDeleteIds.isNotEmpty) {
await _retryPendingSqliteDeletesLocked();
}
// Drain memory queue events to SQLite first if DB is available.
// This MUST happen inside readBatch (not insertEvent recovery) to
// avoid a race where drainToDb moves events that were already
// returned by a previous readBatch and are still in-flight.
if (_dbAvailable && _memoryQueue.isNotEmpty) {
await _memoryQueue.drainToDb(_sqliteStorage);
_syncStorageState();
}
if (_dbAvailable) {
result.addAll(
await _sqliteStorage.readOldestEventsExcludingIds(
limit,
_pendingSqliteDeleteIds,
),
);
_syncStorageState();
}
// Also include memory queue events if SQLite didn't fill the batch.
// Use negative IDs so deleteBatch can route them correctly.
if (result.length < limit && _memoryQueue.isNotEmpty) {
final memEvents = _memoryQueue.dequeueAll();
final remaining = limit - result.length;
final selectedEvents = memEvents.take(remaining).toList();
_memoryQueue.markInFlight(
selectedEvents.map((e) => e.id).toSet(),
);
for (final e in selectedEvents) {
result.add(
EventRow(
id: -(e.id + 1), // negative ID = memory queue event
tsMillis: e.tsMillis,
eventName: e.eventName,
payloadJson: e.payloadJson,
),
);
}
}
return result;
});
}