enqueue method
Enqueues an operation.
Implementation
@override
Future<void> enqueue(OutboundOperation operation) async {
if (!_isInitialized) {
await initialize();
}
// Check for duplicate operations (same type and data)
// This prevents issues like double-clicking causing duplicate sends
final isDuplicate = _operations.any(
(op) =>
op.type == operation.type &&
op.data == operation.data &&
op.status != OutboundOperationStatus.completed,
);
if (isDuplicate) {
ChatLogger.debug(
'Duplicate operation detected, skipping: ${operation.type}',
);
return;
}
// Check for canceling opposite operations (e.g., star then unstar)
final cancelledIndex = await _findAndCancelOpposite(operation);
if (cancelledIndex >= 0) {
ChatLogger.debug(
'Cancelled opposite operation for: ${operation.type}',
);
return;
}
// Check queue size limit
if (_operations.length >= _maxQueueSize) {
ChatLogger.warning(
'Queue size limit reached ($_maxQueueSize). '
'Removing oldest failed operation.',
);
// Remove oldest failed operation to make room
final oldestFailed = _operations.indexWhere(
(op) => op.status == OutboundOperationStatus.failed,
);
if (oldestFailed >= 0) {
final removed = _operations.removeAt(oldestFailed);
await _database.deleteOperation(removed.id);
} else {
// If no failed operations, reject the new one
throw StateError(
'Queue is full and no failed operations to remove',
);
}
}
_operations.add(operation);
await _database.insertOperation(operation);
_pendingCountController.add(_operations.length);
// Flush immediately if not already processing.
unawaited(processQueue());
}