run method

Implementation

Future<ChartExportJobResult> run(ChartExportJobOptions options) async {
  if (_isRunning) {
    throw StateError('A chart export job is already running.');
  }

  _isRunning = true;
  final token = options.cancellationToken ?? cancellationToken;
  _activeCancellationToken = token;

  try {
    final jobStartedAt = DateTime.now().toUtc();
    final jobStopwatch = Stopwatch()..start();
    final stageDurations = <ChartExportJobStage, Duration>{};
    final warnings = <String>[];
    final events = <ChartExportJobEvent>[];
    final plan = options.buildPlan();
    _recordEvent(
      options,
      warnings,
      events,
      ChartExportJobEvent(
        type: ChartExportJobEventType.planned,
        stage: ChartExportJobStage.idle,
        message: plan.summaryText(),
        total: plan.requestedCount,
        metadata: {
          'canRun': plan.canRun,
          'issueCount': plan.issueCount,
          'warningCount': plan.warningCount,
          'blockerCount': plan.blockerCount,
        },
      ),
    );
    if (!plan.canRun) {
      for (final issue in plan.issues) {
        _addWarning(warnings, issue.message);
      }
    }
    if (!plan.canRun && options.shouldFailOnPreflightBlockers) {
      _recordEvent(
        options,
        warnings,
        events,
        ChartExportJobEvent(
          type: ChartExportJobEventType.blocked,
          stage: ChartExportJobStage.completed,
          message: plan.diagnosticsText(),
          completed: 0,
          total: plan.requestedCount,
          status: ChartExportJobStatus.failed,
          metadata: {
            'blockers': [
              for (final blocker in plan.blockers) blocker.toMetadataJson(),
            ],
          },
        ),
      );
      jobStopwatch.stop();
      final timing = ChartExportJobTiming(
        startedAt: jobStartedAt,
        endedAt: DateTime.now().toUtc(),
        duration: jobStopwatch.elapsed,
        stageDurations: stageDurations,
      );
      final result = ChartExportJobResult(
        exportBatch: ChartExportBatchResult(
          const [],
          skippedUnavailable: [
            for (final request in plan.skippedUnavailable) request.capability,
          ],
          requestedCount: plan.requestedCount,
        ),
        plan: plan,
        timing: timing,
        preflightBlocked: true,
        cancellationReason: _cancelled(token) ? token.reason : null,
        events: events,
        warnings: warnings,
      );
      _emit(
        options,
        warnings,
        ChartExportJobProgress(
          stage: ChartExportJobStage.completed,
          completed: 0,
          total: plan.requestedCount,
          message:
              'Export job blocked before running: '
              '${plan.diagnosticsText()}',
        ),
      );
      _recordEvent(
        options,
        warnings,
        events,
        ChartExportJobEvent(
          type: ChartExportJobEventType.completed,
          stage: ChartExportJobStage.completed,
          message: result.summaryText(),
          completed: 0,
          total: plan.requestedCount,
          status: result.status,
        ),
      );
      return ChartExportJobResult(
        exportBatch: result.exportBatch,
        plan: plan,
        timing: timing,
        preflightBlocked: true,
        cancellationReason: _cancelled(token) ? token.reason : null,
        events: events,
        warnings: warnings,
      );
    }
    final requests = plan.requests;
    _recordEvent(
      options,
      warnings,
      events,
      ChartExportJobEvent(
        type: ChartExportJobEventType.started,
        stage: ChartExportJobStage.exporting,
        message: 'Export job started.',
        completed: 0,
        total: requests.length,
      ),
    );
    final exportBatch = await _timedStage(
      stageDurations,
      ChartExportJobStage.exporting,
      () async {
        _emit(
          options,
          warnings,
          ChartExportJobProgress(
            stage: ChartExportJobStage.exporting,
            completed: 0,
            total: requests.length,
            message: plan.summaryText(),
          ),
        );

        final batch = await ChartExporter.exportAll(
          requests,
          options: ChartExportBatchOptions(
            stopOnFirstFailure: options.stopBatchOnFirstFailure,
            skipUnavailable: options.skipUnavailable,
            continueOnProgressError: options.continueOnCallbackError,
            timeout: options.exportTimeout,
            cancellationToken: token,
            onProgress: (progress) {
              _callCallback(
                options,
                warnings,
                'onExportProgress',
                ChartExportJobStage.exporting,
                () => options.onExportProgress?.call(progress),
              );
              _emit(
                options,
                warnings,
                ChartExportJobProgress(
                  stage: ChartExportJobStage.exporting,
                  completed: progress.completed,
                  total: progress.total,
                  message: progress.result.success
                      ? 'Exported ${progress.result.filename}.'
                      : 'Export failed for ${progress.result.filename}.',
                  exportProgress: progress,
                ),
              );
            },
            onProgressError: (error, stackTrace, _) {
              _handleCallbackError(
                options,
                warnings,
                'onExportProgress',
                ChartExportJobStage.exporting,
                error,
                stackTrace,
              );
            },
          ),
        );

        _emit(
          options,
          warnings,
          ChartExportJobProgress(
            stage: ChartExportJobStage.exporting,
            completed: batch.results.length + batch.skippedUnavailableCount,
            total: batch.requestedCount,
            message: batch.summaryText(),
          ),
        );
        await _callAsyncCallback(
          options,
          warnings,
          'onExportBatchReady',
          ChartExportJobStage.exporting,
          () => options.onExportBatchReady?.call(batch),
        );
        return batch;
      },
    );
    _recordEvent(
      options,
      warnings,
      events,
      ChartExportJobEvent(
        type: ChartExportJobEventType.exported,
        stage: ChartExportJobStage.exporting,
        message: exportBatch.summaryText(),
        completed:
            exportBatch.results.length + exportBatch.skippedUnavailableCount,
        total: exportBatch.requestedCount,
        metadata: {
          'successCount': exportBatch.successCount,
          'failureCount': exportBatch.failureCount,
          'skippedUnavailableCount': exportBatch.skippedUnavailableCount,
          'notRunCount': exportBatch.notRunCount,
        },
      ),
    );

    ChartExportDeliveryBatchResult? exportDelivery;
    ChartExportFile? archive;
    ChartExportDeliveryResult? archiveDelivery;

    final adapter = options.deliveryAdapter;
    if (_cancelled(token)) {
      if (!exportBatch.hasFailures) {
        warnings.add(token.reason ?? 'Export job cancelled.');
      }
    } else if (options.deliverExports) {
      if (adapter == null) {
        warnings.add(
          'Export delivery skipped because no delivery adapter was provided.',
        );
      } else {
        final deliveryBatch = await _timedStage(
          stageDurations,
          ChartExportJobStage.delivering,
          () =>
              _deliverExports(options, exportBatch, adapter, token, warnings),
        );
        exportDelivery = deliveryBatch;
        _recordEvent(
          options,
          warnings,
          events,
          ChartExportJobEvent(
            type: ChartExportJobEventType.delivered,
            stage: ChartExportJobStage.delivering,
            message: deliveryBatch.summaryText(),
            completed: deliveryBatch.results.length,
            total: deliveryBatch.results.length,
            metadata: {
              'successCount': deliveryBatch.successCount,
              'failureCount': deliveryBatch.failureCount,
            },
          ),
        );
      }
    }

    if (!_cancelled(token) && options.shouldBuildArchive) {
      final archiveFile = _timedSyncStage(
        stageDurations,
        ChartExportJobStage.archiving,
        () {
          _emit(
            options,
            warnings,
            const ChartExportJobProgress(
              stage: ChartExportJobStage.archiving,
              completed: 0,
              total: 1,
              message: 'Creating export archive.',
            ),
          );
          final archiveFile = ChartExportArchive.exportBatchZip(
            exportBatch,
            filename:
                options.archiveFilename ??
                options.filename ??
                ChartExportArchive.defaultExportFilename,
            includeManifest: options.includeArchiveManifest,
            manifestFilename: options.archiveManifestFilename,
          );
          _emit(
            options,
            warnings,
            ChartExportJobProgress(
              stage: ChartExportJobStage.archiving,
              completed: 1,
              total: 1,
              message: 'Archive ready: ${archiveFile.filename}.',
            ),
          );
          return archiveFile;
        },
      );
      archive = archiveFile;
      _recordEvent(
        options,
        warnings,
        events,
        ChartExportJobEvent(
          type: ChartExportJobEventType.archived,
          stage: ChartExportJobStage.archiving,
          message: 'Archive ready: ${archiveFile.filename}.',
          completed: 1,
          total: 1,
          filename: archiveFile.filename,
          metadata: {'sizeBytes': archiveFile.sizeBytes},
        ),
      );
    }

    if (!_cancelled(token) && options.deliverArchive) {
      if (adapter == null) {
        warnings.add(
          'Archive delivery skipped because no delivery adapter was provided.',
        );
      } else if (archive != null) {
        final archiveFile = archive;
        final delivery = await _timedStage(
          stageDurations,
          ChartExportJobStage.delivering,
          () async {
            _emit(
              options,
              warnings,
              ChartExportJobProgress(
                stage: ChartExportJobStage.delivering,
                completed: 0,
                total: 1,
                message: 'Delivering archive ${archiveFile.filename}.',
              ),
            );
            final delivery = await ChartExportDelivery.deliverFile(
              archiveFile,
              adapter,
              timeout: options.deliveryTimeout,
              cancellationToken: token,
            );
            _emit(
              options,
              warnings,
              ChartExportJobProgress(
                stage: ChartExportJobStage.delivering,
                completed: 1,
                total: 1,
                message: delivery.success
                    ? 'Archive delivered: ${archiveFile.filename}.'
                    : 'Archive delivery failed: ${delivery.errorText}.',
              ),
            );
            return delivery;
          },
        );
        archiveDelivery = delivery;
        _recordEvent(
          options,
          warnings,
          events,
          ChartExportJobEvent(
            type: ChartExportJobEventType.delivered,
            stage: ChartExportJobStage.delivering,
            message: delivery.success
                ? 'Archive delivered: ${archiveFile.filename}.'
                : 'Archive delivery failed: ${delivery.errorText}.',
            completed: 1,
            total: 1,
            filename: archiveFile.filename,
            metadata: delivery.toMetadataJson(),
          ),
        );
      }
    }

    if (_cancelled(token) && warnings.isEmpty && !exportBatch.hasFailures) {
      warnings.add(token.reason ?? 'Export job cancelled.');
    }
    if (_cancelled(token)) {
      _recordEvent(
        options,
        warnings,
        events,
        ChartExportJobEvent(
          type: ChartExportJobEventType.cancelled,
          stage: ChartExportJobStage.completed,
          message: token.reason ?? 'Export job cancelled.',
          status: ChartExportJobStatus.cancelled,
        ),
      );
    }

    jobStopwatch.stop();
    final timing = ChartExportJobTiming(
      startedAt: jobStartedAt,
      endedAt: DateTime.now().toUtc(),
      duration: jobStopwatch.elapsed,
      stageDurations: stageDurations,
    );
    final resultBeforeCompletedProgress = ChartExportJobResult(
      exportBatch: exportBatch,
      plan: plan,
      exportDelivery: exportDelivery,
      archive: archive,
      archiveDelivery: archiveDelivery,
      timing: timing,
      cancellationReason: _cancelled(token) ? token.reason : null,
      events: events,
      warnings: warnings,
    );
    _emit(
      options,
      warnings,
      ChartExportJobProgress(
        stage: ChartExportJobStage.completed,
        completed: 1,
        total: 1,
        message: resultBeforeCompletedProgress.summaryText(),
      ),
    );
    _recordEvent(
      options,
      warnings,
      events,
      ChartExportJobEvent(
        type: ChartExportJobEventType.completed,
        stage: ChartExportJobStage.completed,
        message: resultBeforeCompletedProgress.summaryText(),
        completed: 1,
        total: 1,
        status: resultBeforeCompletedProgress.status,
      ),
    );
    return ChartExportJobResult(
      exportBatch: exportBatch,
      plan: plan,
      exportDelivery: exportDelivery,
      archive: archive,
      archiveDelivery: archiveDelivery,
      timing: timing,
      cancellationReason: _cancelled(token) ? token.reason : null,
      events: events,
      warnings: warnings,
    );
  } finally {
    _activeCancellationToken = null;
    _isRunning = false;
  }
}