Implementation
Future<ChartExportJobResult> run(ChartExportJobOptions options) async {
if (_isRunning) {
throw StateError('A chart export job is already running.');
}
_isRunning = true;
final token = options.cancellationToken ?? cancellationToken;
_activeCancellationToken = token;
try {
final jobStartedAt = DateTime.now().toUtc();
final jobStopwatch = Stopwatch()..start();
final stageDurations = <ChartExportJobStage, Duration>{};
final warnings = <String>[];
final events = <ChartExportJobEvent>[];
final plan = options.buildPlan();
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.planned,
stage: ChartExportJobStage.idle,
message: plan.summaryText(),
total: plan.requestedCount,
metadata: {
'canRun': plan.canRun,
'issueCount': plan.issueCount,
'warningCount': plan.warningCount,
'blockerCount': plan.blockerCount,
},
),
);
if (!plan.canRun) {
for (final issue in plan.issues) {
_addWarning(warnings, issue.message);
}
}
if (!plan.canRun && options.shouldFailOnPreflightBlockers) {
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.blocked,
stage: ChartExportJobStage.completed,
message: plan.diagnosticsText(),
completed: 0,
total: plan.requestedCount,
status: ChartExportJobStatus.failed,
metadata: {
'blockers': [
for (final blocker in plan.blockers) blocker.toMetadataJson(),
],
},
),
);
jobStopwatch.stop();
final timing = ChartExportJobTiming(
startedAt: jobStartedAt,
endedAt: DateTime.now().toUtc(),
duration: jobStopwatch.elapsed,
stageDurations: stageDurations,
);
final result = ChartExportJobResult(
exportBatch: ChartExportBatchResult(
const [],
skippedUnavailable: [
for (final request in plan.skippedUnavailable) request.capability,
],
requestedCount: plan.requestedCount,
),
plan: plan,
timing: timing,
preflightBlocked: true,
cancellationReason: _cancelled(token) ? token.reason : null,
events: events,
warnings: warnings,
);
_emit(
options,
warnings,
ChartExportJobProgress(
stage: ChartExportJobStage.completed,
completed: 0,
total: plan.requestedCount,
message:
'Export job blocked before running: '
'${plan.diagnosticsText()}',
),
);
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.completed,
stage: ChartExportJobStage.completed,
message: result.summaryText(),
completed: 0,
total: plan.requestedCount,
status: result.status,
),
);
return ChartExportJobResult(
exportBatch: result.exportBatch,
plan: plan,
timing: timing,
preflightBlocked: true,
cancellationReason: _cancelled(token) ? token.reason : null,
events: events,
warnings: warnings,
);
}
final requests = plan.requests;
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.started,
stage: ChartExportJobStage.exporting,
message: 'Export job started.',
completed: 0,
total: requests.length,
),
);
final exportBatch = await _timedStage(
stageDurations,
ChartExportJobStage.exporting,
() async {
_emit(
options,
warnings,
ChartExportJobProgress(
stage: ChartExportJobStage.exporting,
completed: 0,
total: requests.length,
message: plan.summaryText(),
),
);
final batch = await ChartExporter.exportAll(
requests,
options: ChartExportBatchOptions(
stopOnFirstFailure: options.stopBatchOnFirstFailure,
skipUnavailable: options.skipUnavailable,
continueOnProgressError: options.continueOnCallbackError,
timeout: options.exportTimeout,
cancellationToken: token,
onProgress: (progress) {
_callCallback(
options,
warnings,
'onExportProgress',
ChartExportJobStage.exporting,
() => options.onExportProgress?.call(progress),
);
_emit(
options,
warnings,
ChartExportJobProgress(
stage: ChartExportJobStage.exporting,
completed: progress.completed,
total: progress.total,
message: progress.result.success
? 'Exported ${progress.result.filename}.'
: 'Export failed for ${progress.result.filename}.',
exportProgress: progress,
),
);
},
onProgressError: (error, stackTrace, _) {
_handleCallbackError(
options,
warnings,
'onExportProgress',
ChartExportJobStage.exporting,
error,
stackTrace,
);
},
),
);
_emit(
options,
warnings,
ChartExportJobProgress(
stage: ChartExportJobStage.exporting,
completed: batch.results.length + batch.skippedUnavailableCount,
total: batch.requestedCount,
message: batch.summaryText(),
),
);
await _callAsyncCallback(
options,
warnings,
'onExportBatchReady',
ChartExportJobStage.exporting,
() => options.onExportBatchReady?.call(batch),
);
return batch;
},
);
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.exported,
stage: ChartExportJobStage.exporting,
message: exportBatch.summaryText(),
completed:
exportBatch.results.length + exportBatch.skippedUnavailableCount,
total: exportBatch.requestedCount,
metadata: {
'successCount': exportBatch.successCount,
'failureCount': exportBatch.failureCount,
'skippedUnavailableCount': exportBatch.skippedUnavailableCount,
'notRunCount': exportBatch.notRunCount,
},
),
);
ChartExportDeliveryBatchResult? exportDelivery;
ChartExportFile? archive;
ChartExportDeliveryResult? archiveDelivery;
final adapter = options.deliveryAdapter;
if (_cancelled(token)) {
if (!exportBatch.hasFailures) {
warnings.add(token.reason ?? 'Export job cancelled.');
}
} else if (options.deliverExports) {
if (adapter == null) {
warnings.add(
'Export delivery skipped because no delivery adapter was provided.',
);
} else {
final deliveryBatch = await _timedStage(
stageDurations,
ChartExportJobStage.delivering,
() =>
_deliverExports(options, exportBatch, adapter, token, warnings),
);
exportDelivery = deliveryBatch;
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.delivered,
stage: ChartExportJobStage.delivering,
message: deliveryBatch.summaryText(),
completed: deliveryBatch.results.length,
total: deliveryBatch.results.length,
metadata: {
'successCount': deliveryBatch.successCount,
'failureCount': deliveryBatch.failureCount,
},
),
);
}
}
if (!_cancelled(token) && options.shouldBuildArchive) {
final archiveFile = _timedSyncStage(
stageDurations,
ChartExportJobStage.archiving,
() {
_emit(
options,
warnings,
const ChartExportJobProgress(
stage: ChartExportJobStage.archiving,
completed: 0,
total: 1,
message: 'Creating export archive.',
),
);
final archiveFile = ChartExportArchive.exportBatchZip(
exportBatch,
filename:
options.archiveFilename ??
options.filename ??
ChartExportArchive.defaultExportFilename,
includeManifest: options.includeArchiveManifest,
manifestFilename: options.archiveManifestFilename,
);
_emit(
options,
warnings,
ChartExportJobProgress(
stage: ChartExportJobStage.archiving,
completed: 1,
total: 1,
message: 'Archive ready: ${archiveFile.filename}.',
),
);
return archiveFile;
},
);
archive = archiveFile;
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.archived,
stage: ChartExportJobStage.archiving,
message: 'Archive ready: ${archiveFile.filename}.',
completed: 1,
total: 1,
filename: archiveFile.filename,
metadata: {'sizeBytes': archiveFile.sizeBytes},
),
);
}
if (!_cancelled(token) && options.deliverArchive) {
if (adapter == null) {
warnings.add(
'Archive delivery skipped because no delivery adapter was provided.',
);
} else if (archive != null) {
final archiveFile = archive;
final delivery = await _timedStage(
stageDurations,
ChartExportJobStage.delivering,
() async {
_emit(
options,
warnings,
ChartExportJobProgress(
stage: ChartExportJobStage.delivering,
completed: 0,
total: 1,
message: 'Delivering archive ${archiveFile.filename}.',
),
);
final delivery = await ChartExportDelivery.deliverFile(
archiveFile,
adapter,
timeout: options.deliveryTimeout,
cancellationToken: token,
);
_emit(
options,
warnings,
ChartExportJobProgress(
stage: ChartExportJobStage.delivering,
completed: 1,
total: 1,
message: delivery.success
? 'Archive delivered: ${archiveFile.filename}.'
: 'Archive delivery failed: ${delivery.errorText}.',
),
);
return delivery;
},
);
archiveDelivery = delivery;
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.delivered,
stage: ChartExportJobStage.delivering,
message: delivery.success
? 'Archive delivered: ${archiveFile.filename}.'
: 'Archive delivery failed: ${delivery.errorText}.',
completed: 1,
total: 1,
filename: archiveFile.filename,
metadata: delivery.toMetadataJson(),
),
);
}
}
if (_cancelled(token) && warnings.isEmpty && !exportBatch.hasFailures) {
warnings.add(token.reason ?? 'Export job cancelled.');
}
if (_cancelled(token)) {
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.cancelled,
stage: ChartExportJobStage.completed,
message: token.reason ?? 'Export job cancelled.',
status: ChartExportJobStatus.cancelled,
),
);
}
jobStopwatch.stop();
final timing = ChartExportJobTiming(
startedAt: jobStartedAt,
endedAt: DateTime.now().toUtc(),
duration: jobStopwatch.elapsed,
stageDurations: stageDurations,
);
final resultBeforeCompletedProgress = ChartExportJobResult(
exportBatch: exportBatch,
plan: plan,
exportDelivery: exportDelivery,
archive: archive,
archiveDelivery: archiveDelivery,
timing: timing,
cancellationReason: _cancelled(token) ? token.reason : null,
events: events,
warnings: warnings,
);
_emit(
options,
warnings,
ChartExportJobProgress(
stage: ChartExportJobStage.completed,
completed: 1,
total: 1,
message: resultBeforeCompletedProgress.summaryText(),
),
);
_recordEvent(
options,
warnings,
events,
ChartExportJobEvent(
type: ChartExportJobEventType.completed,
stage: ChartExportJobStage.completed,
message: resultBeforeCompletedProgress.summaryText(),
completed: 1,
total: 1,
status: resultBeforeCompletedProgress.status,
),
);
return ChartExportJobResult(
exportBatch: exportBatch,
plan: plan,
exportDelivery: exportDelivery,
archive: archive,
archiveDelivery: archiveDelivery,
timing: timing,
cancellationReason: _cancelled(token) ? token.reason : null,
events: events,
warnings: warnings,
);
} finally {
_activeCancellationToken = null;
_isRunning = false;
}
}