runCheckpoint method

Future<CheckpointResult> runCheckpoint()

Implementation

Future<CheckpointResult> runCheckpoint() async {
  final sw  = Stopwatch()..start();
  final seq = ++_checkpointSeq;

  print('[Checkpoint #$seq] Starting...');

  // ── STEP 1: Write CHECKPOINT_BEGIN to WAL ────────────────────────
  //
  // This record marks where redo analysis starts.
  // If we crash before CHECKPOINT_END, the next recovery begins from
  // the previous checkpoint LSN (not this one).
  final beginLsn = await _walAppendCheckpointBegin();
  print('[Checkpoint #$seq] WAL CHECKPOINT_BEGIN at LSN=$beginLsn');

  // ── STEP 2: Fsync WAL ───────────────────────────────────────────
  //
  // INV-1: WAL must be durable before any data page is written.
  // The JSON WAL already fsyncs on every append(), so flushedLsn
  // is already up-to-date. We just capture it here.
  final walFlushedLsn = wal.flushedLsn;
  print('[Checkpoint #$seq] WAL fsynced (flushedLsn=$walFlushedLsn)');

  // ── STEP 3: Flush dirty data pages ──────────────────────────────
  //
  // Two dirty-page sources:
  //   A. PageTable.dirtyPageIds — binary slotted-page tables (new write path).
  //   B. PageCache.dirtyPageCount — legacy JSON-page Table writes directly
  //      to the cache (Table._buildPage → cache.put).
  //
  // Both are flushed here. cache.flushAll() is the authoritative flush;
  // the PageTable loop is an optimisation that lets us count precisely.
  int dirtyFlushed = 0;
  final tables = getTables();

  for (final entry in tables.entries) {
    final table     = entry.value;
    final dirtyPids = table.dirtyPageIds;
    for (final pid in dirtyPids) {
      await cache.flushPage(pid);
      dirtyFlushed++;
    }
    table.markCheckpointDone();
  }

  // Flush ALL remaining cache-dirty pages (covers legacy Table writes
  // that bypass PageTable's DirtyPageTracker).
  final cacheDirtyBefore = cache.dirtyPageCount;
  await cache.flushAll();
  // Add any cache-dirty pages not already counted via pageTables above.
  dirtyFlushed += cacheDirtyBefore;

  // Fsync the data file
  await pager.flush();
  print('[Checkpoint #$seq] Flushed $dirtyFlushed dirty pages, data fsynced.');

  // ── STEP 4: Persist statistics ──────────────────────────────────
  //
  // Non-critical. If this fails, the CBO falls back to rule-based plans
  // after restart — not a correctness issue.
  try {
    await persistStats();
  } catch (e) {
    print('[Checkpoint #$seq] Warning: stats persist failed: $e');
  }

  // ── STEP 5: Persist transaction state ───────────────────────────
  //
  // Writes txn_state.dat and committed_txns.dat with the new
  // checkpointLsn = walFlushedLsn. This is the LSN from which
  // recovery will start on the NEXT crash.
  await persistTxnState(walFlushedLsn);
  print('[Checkpoint #$seq] Transaction state persisted.');

  // ── STEP 6: Persist catalog ──────────────────────────────────────
  await persistCatalog();
  print('[Checkpoint #$seq] Catalog persisted.');

  // ── STEP 7: Persist indexes ──────────────────────────────────────
  //
  // Indexes are written AFTER data pages — ensures no index entry can
  // point to a non-existent tuple.
  await persistIndexes(tables);
  print('[Checkpoint #$seq] Indexes persisted.');

  // ── STEP 8: Write CHECKPOINT_END to WAL ─────────────────────────
  final endLsn = await _walAppendCheckpointEnd(walFlushedLsn);
  print('[Checkpoint #$seq] WAL CHECKPOINT_END at LSN=$endLsn');

  // ── STEP 9+10: Truncate WAL ──────────────────────────────────────
  //
  // wal.truncate() fsyncs any pending writes, then rewrites the WAL
  // file to contain only the single CHECKPOINT record.
  // Safe because: data pages fsynced (step 3), catalog/indexes written
  // (steps 6-7), CHECKPOINT_END appended (step 8).
  await wal.truncate();
  lastCheckpointLsn = walFlushedLsn;
  final truncated = 0; // WAL is now compact (truncate rewrites the file)
  print('[Checkpoint #$seq] WAL truncated.');

  sw.stop();
  final result = CheckpointResult(
    dirtyPagesFlushed:   dirtyFlushed,
    walRecordsTruncated: truncated,
    tablesCheckpointed:  tables.length,
    indexesWritten:      tables.length, // one per table (may be 0 if no index)
    elapsed:             sw.elapsed,
    checkpointLsn:       walFlushedLsn,
  );
  print('[Checkpoint #$seq] Done. $result');
  return result;
}