syncPendingMutations method
Future<void>
syncPendingMutations(
)
Implementation
Future<void> syncPendingMutations() async {
if (_disposed) return;
if (_isSyncing) {
SdkLogger.d('Sync already in progress, skipping');
return;
}
if (_connection.state is! Connected) {
SdkLogger.d('syncPendingMutations: not connected, skipping');
return;
}
SdkLogger.d('syncPendingMutations: starting');
_isSyncing = true;
final cycleFailures = <MutationSyncResult>[];
try {
final pending = await _storage.getPendingMutations();
if (pending.isEmpty) {
SdkLogger.d('syncPendingMutations: no pending mutations, setting idle');
_cachedPendingCount = 0;
_updateSyncState(
_currentSyncState.copyWith(status: SyncStatus.idle, pendingCount: 0),
);
return;
}
_cachedPendingCount = pending.length;
_updateSyncState(
_currentSyncState.copyWith(
status: SyncStatus.syncing,
pendingCount: _cachedPendingCount,
),
);
SdkLogger.d('Syncing ${pending.length} pending mutations');
for (final mutation in pending) {
if (_disposed) return;
if (_connection.state is! Connected) {
SdkLogger.d('Connection lost during sync. Pausing queue.');
break;
}
final maxAge = _policy.maxMutationAge;
if (maxAge != null &&
DateTime.now().difference(mutation.createdAt) > maxAge) {
await _discardMutation(
mutation,
'expired: older than ${maxAge.inSeconds}s at replay',
cycleFailures,
expired: true,
);
continue;
}
final hook = _policy.onBeforeReplay;
if (hook != null) {
ReplayDecision decision;
try {
decision = await hook(mutation);
} catch (e) {
SdkLogger.e('onBeforeReplay hook threw, replaying anyway: $e');
decision = ReplayDecision.replay;
}
if (decision == ReplayDecision.discard) {
await _discardMutation(
mutation,
'discarded by onBeforeReplay',
cycleFailures,
);
continue;
}
}
try {
SdkLogger.d(
'SYNC_SEND: ${mutation.reducerName}, uuidRequestId=${mutation.requestId}, argsLen=${mutation.encodedArgs.length}',
);
final result = await _send(
mutation.reducerName,
mutation.encodedArgs,
requestId: mutation.requestId,
);
if (_disposed) return;
if (result.isSuccess) {
await _storage.dequeueMutation(mutation.requestId);
_decrementPendingCount();
SdkLogger.d('Synced mutation: ${mutation.reducerName}');
if (!_disposed) {
_mutationSyncResultController.add(
MutationSyncResult(
requestId: mutation.requestId,
reducerName: mutation.reducerName,
success: true,
),
);
}
} else {
final errorMsg = result.errorMessage ?? 'Unknown error';
_optimisticState.rollbackOptimisticChanges(mutation.requestId);
await _storage.dequeueMutation(mutation.requestId);
_decrementPendingCount();
SdkLogger.e(
'Server rejected mutation: ${mutation.reducerName} - $errorMsg',
);
final failure = MutationSyncResult(
requestId: mutation.requestId,
reducerName: mutation.reducerName,
success: false,
error: errorMsg,
optimisticChanges: mutation.optimisticChanges,
);
cycleFailures.add(failure);
if (!_disposed) {
_mutationSyncResultController.add(failure);
}
}
} on SpacetimeDbReducerException catch (e) {
_optimisticState.rollbackOptimisticChanges(mutation.requestId);
await _storage.dequeueMutation(mutation.requestId);
_decrementPendingCount();
SdkLogger.e(
'Server rejected mutation: ${mutation.reducerName} - ${e.message}',
);
final failure = MutationSyncResult(
requestId: mutation.requestId,
reducerName: mutation.reducerName,
success: false,
error: e.message,
optimisticChanges: mutation.optimisticChanges,
);
cycleFailures.add(failure);
if (!_disposed) {
_mutationSyncResultController.add(failure);
}
} on SpacetimeDbTimeoutException catch (e) {
SdkLogger.w(
'Timeout syncing ${mutation.reducerName}: $e. Keeping in queue for retry.',
);
break;
} catch (e) {
SdkLogger.w(
'Network error syncing ${mutation.reducerName}: $e. Pausing queue.',
);
break;
}
}
} finally {
_isSyncing = false;
SdkLogger.d('syncPendingMutations: finished (isSyncing=false)');
}
if (_disposed) {
SdkLogger.d(
'syncPendingMutations: disposed, skipping final state update',
);
return;
}
final remaining = await _storage.getPendingMutations();
_cachedPendingCount = remaining.length;
SdkLogger.d(
'syncPendingMutations: remaining=$_cachedPendingCount, '
'failures=${cycleFailures.length}, setting idle',
);
int failedCount;
List<MutationSyncResult> recentFailures;
if (cycleFailures.isNotEmpty) {
failedCount = _currentSyncState.failedCount + cycleFailures.length;
recentFailures = _capFailures([
..._currentSyncState.recentFailures,
...cycleFailures,
]);
} else if (remaining.isEmpty) {
failedCount = 0;
recentFailures = const [];
} else {
failedCount = _currentSyncState.failedCount;
recentFailures = _currentSyncState.recentFailures;
}
_updateSyncState(
_currentSyncState.copyWith(
status: SyncStatus.idle,
pendingCount: _cachedPendingCount,
lastSyncTime: DateTime.now(),
failedCount: failedCount,
recentFailures: recentFailures,
),
);
if (remaining.isNotEmpty && _connection.state is Connected) {
_scheduleRetry();
} else if (remaining.isEmpty) {
cancelRetry();
_retryAttempt = 0;
}
}