applyAll method

Future<MigrationReport> applyAll({
  1. int? limit,
})

Applies all pending migrations (or up to limit).

Implementation

Future<MigrationReport> applyAll({int? limit}) async {
  if (limit != null && limit < 1) {
    throw ArgumentError.value(limit, 'limit', 'Must be >= 1');
  }

  final schema = _defaultSchema;
  if (schema != null) {
    await _schemaDriver.setCurrentSchema(schema);
  }

  await _ledger.ensureInitialized();
  final applied = await _ledger.readApplied();
  final appliedById = {
    for (final record in applied) record.id.toString(): record,
  };
  // Also index by timestamp for backward compatibility with old naming format
  final appliedByTimestamp = {
    for (final record in applied) record.id.timestamp.toString(): record,
  };
  final batch = await _ledger.nextBatchNumber();

  // Collect pending migrations first
  final pending = <MigrationDescriptor>[];
  for (final descriptor in _migrations) {
    final key = descriptor.id.toString();
    final existing =
        appliedById[key] ??
        appliedByTimestamp[descriptor.id.timestamp.toString()];
    if (existing != null) {
      if (existing.checksum != descriptor.checksum) {
        throw StateError(
          'Migration ${descriptor.id} checksum mismatch. Expected ${descriptor.checksum} but ledger has ${existing.checksum}.',
        );
      }
      continue;
    }
    pending.add(descriptor);
    if (limit != null && pending.length >= limit) {
      break;
    }
  }

  if (pending.isEmpty) {
    return const MigrationReport([]);
  }

  // Emit batch started event
  final batchStopwatch = Stopwatch()..start();
  if (_emitEvents) {
    _events.emit(
      MigrationBatchStartedEvent(
        direction: MigrationDirection.up,
        count: pending.length,
        batch: batch,
      ),
    );
  }

  final actions = <MigrationAction>[];
  for (var i = 0; i < pending.length; i++) {
    final descriptor = pending[i];
    final migrationId = descriptor.id.toString();
    final migrationName = descriptor.id.slug;

    // Emit migration started
    if (_emitEvents) {
      _events.emit(
        MigrationStartedEvent(
          migrationId: migrationId,
          migrationName: migrationName,
          direction: MigrationDirection.up,
          index: i + 1,
          total: pending.length,
        ),
      );
    }

    final appliedAt = DateTime.now().toUtc();
    final stopwatch = Stopwatch()..start();

    try {
      final plan = await _planResolver(descriptor, MigrationDirection.up);
      await _schemaDriver.applySchemaPlan(plan);
      stopwatch.stop();
      await _ledger.logApplied(descriptor, appliedAt, batch: batch);

      // Emit migration completed
      if (_emitEvents) {
        _events.emit(
          MigrationCompletedEvent(
            migrationId: migrationId,
            migrationName: migrationName,
            direction: MigrationDirection.up,
            duration: stopwatch.elapsed,
          ),
        );
      }

      actions.add(
        MigrationAction(
          descriptor: descriptor,
          operation: MigrationOperation.apply,
          appliedAt: appliedAt,
          duration: stopwatch.elapsed,
        ),
      );
    } catch (error, stackTrace) {
      stopwatch.stop();
      // Emit migration failed
      if (_emitEvents) {
        _events.emit(
          MigrationFailedEvent(
            migrationId: migrationId,
            migrationName: migrationName,
            direction: MigrationDirection.up,
            error: error,
            stackTrace: stackTrace,
          ),
        );
      }
      rethrow;
    }
  }

  batchStopwatch.stop();
  if (_emitEvents) {
    _events.emit(
      MigrationBatchCompletedEvent(
        direction: MigrationDirection.up,
        count: actions.length,
        duration: batchStopwatch.elapsed,
      ),
    );
  }

  return MigrationReport(actions);
}