enqueue method

  1. @override
Future<void> enqueue(
  1. OutboundOperation operation
)
override

Enqueues an operation.

Implementation

@override
Future<void> enqueue(OutboundOperation operation) async {
  if (!_isInitialized) {
    await initialize();
  }

  // Check for duplicate operations (same type and data)
  // This prevents issues like double-clicking causing duplicate sends
  final isDuplicate = _operations.any(
    (op) =>
        op.type == operation.type &&
        op.data == operation.data &&
        op.status != OutboundOperationStatus.completed,
  );
  if (isDuplicate) {
    ChatLogger.debug(
      'Duplicate operation detected, skipping: ${operation.type}',
    );
    return;
  }

  // Check for canceling opposite operations (e.g., star then unstar)
  final cancelledIndex = await _findAndCancelOpposite(operation);
  if (cancelledIndex >= 0) {
    ChatLogger.debug(
      'Cancelled opposite operation for: ${operation.type}',
    );
    return;
  }

  // Check queue size limit
  if (_operations.length >= _maxQueueSize) {
    ChatLogger.warning(
      'Queue size limit reached ($_maxQueueSize). '
      'Removing oldest failed operation.',
    );
    // Remove oldest failed operation to make room
    final oldestFailed = _operations.indexWhere(
      (op) => op.status == OutboundOperationStatus.failed,
    );
    if (oldestFailed >= 0) {
      final removed = _operations.removeAt(oldestFailed);
      await _database.deleteOperation(removed.id);
    } else {
      // If no failed operations, reject the new one
      throw StateError(
        'Queue is full and no failed operations to remove',
      );
    }
  }

  _operations.add(operation);
  await _database.insertOperation(operation);
  _pendingCountController.add(_operations.length);

  // Flush immediately if not already processing.
  unawaited(processQueue());
}