readBatch method

Future<List<EventRow>> readBatch(
  1. int limit
)

Read a batch of events for flushing. Priority events come first. Memory queue events are returned with negative IDs (-(seq+1)) to distinguish them from SQLite rows, enabling correct deletion routing.

Implementation

Future<List<EventRow>> readBatch(int limit) async {
  return await _lock.synchronized(() async {
    final List<EventRow> result = [];

    if (_dbAvailable && _pendingSqliteDeleteIds.isNotEmpty) {
      await _retryPendingSqliteDeletesLocked();
    }

    // Drain memory queue events to SQLite first if DB is available.
    // This MUST happen inside readBatch (not insertEvent recovery) to
    // avoid a race where drainToDb moves events that were already
    // returned by a previous readBatch and are still in-flight.
    if (_dbAvailable && _memoryQueue.isNotEmpty) {
      await _memoryQueue.drainToDb(_sqliteStorage);
      _syncStorageState();
    }

    if (_dbAvailable) {
      result.addAll(
        await _sqliteStorage.readOldestEventsExcludingIds(
          limit,
          _pendingSqliteDeleteIds,
        ),
      );
      _syncStorageState();
    }

    // Also include memory queue events if SQLite didn't fill the batch.
    // Use negative IDs so deleteBatch can route them correctly.
    if (result.length < limit && _memoryQueue.isNotEmpty) {
      final memEvents = _memoryQueue.dequeueAll();
      final remaining = limit - result.length;
      final selectedEvents = memEvents.take(remaining).toList();
      _memoryQueue.markInFlight(
        selectedEvents.map((e) => e.id).toSet(),
      );
      for (final e in selectedEvents) {
        result.add(
          EventRow(
            id: -(e.id + 1), // negative ID = memory queue event
            tsMillis: e.tsMillis,
            eventName: e.eventName,
            payloadJson: e.payloadJson,
          ),
        );
      }
    }

    return result;
  });
}