syncPendingMutations method

Future<void> syncPendingMutations()

Implementation

Future<void> syncPendingMutations() async {
  if (_disposed) return;
  if (_isSyncing) {
    SdkLogger.d('Sync already in progress, skipping');
    return;
  }
  if (_connection.state is! Connected) {
    SdkLogger.d('syncPendingMutations: not connected, skipping');
    return;
  }

  SdkLogger.d('syncPendingMutations: starting');
  _isSyncing = true;
  final cycleFailures = <MutationSyncResult>[];

  try {
    final pending = await _storage.getPendingMutations();
    if (pending.isEmpty) {
      SdkLogger.d('syncPendingMutations: no pending mutations, setting idle');
      _cachedPendingCount = 0;
      _updateSyncState(
        _currentSyncState.copyWith(status: SyncStatus.idle, pendingCount: 0),
      );
      return;
    }

    _cachedPendingCount = pending.length;
    _updateSyncState(
      _currentSyncState.copyWith(
        status: SyncStatus.syncing,
        pendingCount: _cachedPendingCount,
      ),
    );

    SdkLogger.d('Syncing ${pending.length} pending mutations');

    for (final mutation in pending) {
      if (_disposed) return;
      if (_connection.state is! Connected) {
        SdkLogger.d('Connection lost during sync. Pausing queue.');
        break;
      }

      final maxAge = _policy.maxMutationAge;
      if (maxAge != null &&
          DateTime.now().difference(mutation.createdAt) > maxAge) {
        await _discardMutation(
          mutation,
          'expired: older than ${maxAge.inSeconds}s at replay',
          cycleFailures,
          expired: true,
        );
        continue;
      }

      final hook = _policy.onBeforeReplay;
      if (hook != null) {
        ReplayDecision decision;
        try {
          decision = await hook(mutation);
        } catch (e) {
          SdkLogger.e('onBeforeReplay hook threw, replaying anyway: $e');
          decision = ReplayDecision.replay;
        }
        if (decision == ReplayDecision.discard) {
          await _discardMutation(
            mutation,
            'discarded by onBeforeReplay',
            cycleFailures,
          );
          continue;
        }
      }

      try {
        SdkLogger.d(
          'SYNC_SEND: ${mutation.reducerName}, uuidRequestId=${mutation.requestId}, argsLen=${mutation.encodedArgs.length}',
        );
        final result = await _send(
          mutation.reducerName,
          mutation.encodedArgs,
          requestId: mutation.requestId,
        );

        if (_disposed) return;

        if (result.isSuccess) {
          await _storage.dequeueMutation(mutation.requestId);
          _decrementPendingCount();
          SdkLogger.d('Synced mutation: ${mutation.reducerName}');
          if (!_disposed) {
            _mutationSyncResultController.add(
              MutationSyncResult(
                requestId: mutation.requestId,
                reducerName: mutation.reducerName,
                success: true,
              ),
            );
          }
        } else {
          final errorMsg = result.errorMessage ?? 'Unknown error';
          _optimisticState.rollbackOptimisticChanges(mutation.requestId);
          await _storage.dequeueMutation(mutation.requestId);
          _decrementPendingCount();
          SdkLogger.e(
            'Server rejected mutation: ${mutation.reducerName} - $errorMsg',
          );
          final failure = MutationSyncResult(
            requestId: mutation.requestId,
            reducerName: mutation.reducerName,
            success: false,
            error: errorMsg,
            optimisticChanges: mutation.optimisticChanges,
          );
          cycleFailures.add(failure);
          if (!_disposed) {
            _mutationSyncResultController.add(failure);
          }
        }
      } on SpacetimeDbReducerException catch (e) {
        _optimisticState.rollbackOptimisticChanges(mutation.requestId);
        await _storage.dequeueMutation(mutation.requestId);
        _decrementPendingCount();
        SdkLogger.e(
          'Server rejected mutation: ${mutation.reducerName} - ${e.message}',
        );
        final failure = MutationSyncResult(
          requestId: mutation.requestId,
          reducerName: mutation.reducerName,
          success: false,
          error: e.message,
          optimisticChanges: mutation.optimisticChanges,
        );
        cycleFailures.add(failure);
        if (!_disposed) {
          _mutationSyncResultController.add(failure);
        }
      } on SpacetimeDbTimeoutException catch (e) {
        SdkLogger.w(
          'Timeout syncing ${mutation.reducerName}: $e. Keeping in queue for retry.',
        );
        break;
      } catch (e) {
        SdkLogger.w(
          'Network error syncing ${mutation.reducerName}: $e. Pausing queue.',
        );
        break;
      }
    }
  } finally {
    _isSyncing = false;
    SdkLogger.d('syncPendingMutations: finished (isSyncing=false)');
  }

  if (_disposed) {
    SdkLogger.d(
      'syncPendingMutations: disposed, skipping final state update',
    );
    return;
  }

  final remaining = await _storage.getPendingMutations();
  _cachedPendingCount = remaining.length;
  SdkLogger.d(
    'syncPendingMutations: remaining=$_cachedPendingCount, '
    'failures=${cycleFailures.length}, setting idle',
  );

  int failedCount;
  List<MutationSyncResult> recentFailures;
  if (cycleFailures.isNotEmpty) {
    failedCount = _currentSyncState.failedCount + cycleFailures.length;
    recentFailures = _capFailures([
      ..._currentSyncState.recentFailures,
      ...cycleFailures,
    ]);
  } else if (remaining.isEmpty) {
    failedCount = 0;
    recentFailures = const [];
  } else {
    failedCount = _currentSyncState.failedCount;
    recentFailures = _currentSyncState.recentFailures;
  }

  _updateSyncState(
    _currentSyncState.copyWith(
      status: SyncStatus.idle,
      pendingCount: _cachedPendingCount,
      lastSyncTime: DateTime.now(),
      failedCount: failedCount,
      recentFailures: recentFailures,
    ),
  );

  if (remaining.isNotEmpty && _connection.state is Connected) {
    _scheduleRetry();
  } else if (remaining.isEmpty) {
    cancelRetry();
    _retryAttempt = 0;
  }
}