applyAll method
Applies all pending migrations (or up to limit).
Implementation
Future<MigrationReport> applyAll({int? limit}) async {
if (limit != null && limit < 1) {
throw ArgumentError.value(limit, 'limit', 'Must be >= 1');
}
final schema = _defaultSchema;
if (schema != null) {
await _schemaDriver.setCurrentSchema(schema);
}
await _ledger.ensureInitialized();
final applied = await _ledger.readApplied();
final appliedById = {
for (final record in applied) record.id.toString(): record,
};
// Also index by timestamp for backward compatibility with old naming format
final appliedByTimestamp = {
for (final record in applied) record.id.timestamp.toString(): record,
};
final batch = await _ledger.nextBatchNumber();
// Collect pending migrations first
final pending = <MigrationDescriptor>[];
for (final descriptor in _migrations) {
final key = descriptor.id.toString();
final existing =
appliedById[key] ??
appliedByTimestamp[descriptor.id.timestamp.toString()];
if (existing != null) {
if (existing.checksum != descriptor.checksum) {
throw StateError(
'Migration ${descriptor.id} checksum mismatch. Expected ${descriptor.checksum} but ledger has ${existing.checksum}.',
);
}
continue;
}
pending.add(descriptor);
if (limit != null && pending.length >= limit) {
break;
}
}
if (pending.isEmpty) {
return const MigrationReport([]);
}
// Emit batch started event
final batchStopwatch = Stopwatch()..start();
if (_emitEvents) {
_events.emit(
MigrationBatchStartedEvent(
direction: MigrationDirection.up,
count: pending.length,
batch: batch,
),
);
}
final actions = <MigrationAction>[];
for (var i = 0; i < pending.length; i++) {
final descriptor = pending[i];
final migrationId = descriptor.id.toString();
final migrationName = descriptor.id.slug;
// Emit migration started
if (_emitEvents) {
_events.emit(
MigrationStartedEvent(
migrationId: migrationId,
migrationName: migrationName,
direction: MigrationDirection.up,
index: i + 1,
total: pending.length,
),
);
}
final appliedAt = DateTime.now().toUtc();
final stopwatch = Stopwatch()..start();
try {
final plan = await _planResolver(descriptor, MigrationDirection.up);
await _schemaDriver.applySchemaPlan(plan);
stopwatch.stop();
await _ledger.logApplied(descriptor, appliedAt, batch: batch);
// Emit migration completed
if (_emitEvents) {
_events.emit(
MigrationCompletedEvent(
migrationId: migrationId,
migrationName: migrationName,
direction: MigrationDirection.up,
duration: stopwatch.elapsed,
),
);
}
actions.add(
MigrationAction(
descriptor: descriptor,
operation: MigrationOperation.apply,
appliedAt: appliedAt,
duration: stopwatch.elapsed,
),
);
} catch (error, stackTrace) {
stopwatch.stop();
// Emit migration failed
if (_emitEvents) {
_events.emit(
MigrationFailedEvent(
migrationId: migrationId,
migrationName: migrationName,
direction: MigrationDirection.up,
error: error,
stackTrace: stackTrace,
),
);
}
rethrow;
}
}
batchStopwatch.stop();
if (_emitEvents) {
_events.emit(
MigrationBatchCompletedEvent(
direction: MigrationDirection.up,
count: actions.length,
duration: batchStopwatch.elapsed,
),
);
}
return MigrationReport(actions);
}