sendOperations method

  1. @override
Future<void> sendOperations(
  1. List<Operation> operations
)
override

Sends multiple operations to the server/peers efficiently.

Implementation

@override
Future<void> sendOperations(List<Operation> operations) async {
  const transientOps = {'CURSOR_MOVE', 'GHOST_UPDATE', 'PRESENCE_UPDATE'};
  final opsToQueue = <Operation>[];

  for (final op in operations) {
    if (transientOps.contains(op.type)) {
      if (_innerClient != null && _isConnected) {
        await _innerClient!.sendOperation(op);
      } else {
        print('OfflineClient: Dropping transient operation ${op.type} because offline');
      }
    } else {
      opsToQueue.add(op);
    }
  }

  if (opsToQueue.isNotEmpty) {
    if (kIsWeb && opsToQueue.length > 100 && (!_isConnected || _innerClient == null)) {
      print('OfflineClient: Large batch on Web ($opsToQueue.length ops) and not connected. Waiting up to 2s...');
      try {
        await _connectionStateController.stream
            .firstWhere((isConnected) => isConnected)
            .timeout(const Duration(seconds: 2));
        print('OfflineClient: Connected after wait! Proceeding to try direct send.');
      } catch (_) {
        print('OfflineClient: Connection wait timed out. Falling back to queue.');
      }
    }

    bool sentDirectly = false;

    if (_isConnected && _innerClient != null && _isDbReady) {
      try {
        await _lock.synchronized(() async {
          final countResult = await _db.query('SELECT COUNT(*) FROM offline_queue WHERE is_deleted = 0');
          final count =
              countResult.firstOrNull?['COUNT(*)'] as int? ?? countResult.firstOrNull?.values.first as int? ?? 0;

          if (count == 0) {
            print('OfflineClient: Queue is empty, sending ${opsToQueue.length} ops directly bypass DB.');
            await _innerClient!.sendOperations(opsToQueue);
            sentDirectly = true;
          } else if (count < 100) {
            print('OfflineClient: Queue has $count items (small). Draining and bundling...');

            final rows = await _db.query('SELECT * FROM offline_queue WHERE is_deleted = 0 ORDER BY id ASC');
            final idsToDelete = <int>[];
            final stragglers = <Operation>[];

            for (final row in rows) {
              final id = row['id'] as int;
              try {
                final dataDynamic = await decodeJsonInBackground(row['data'] as String);
                if (dataDynamic != null) {
                  final Map<String, dynamic> dataMap = Map<String, dynamic>.from(dataDynamic as Map);
                  if (dataMap.containsKey('start')) {
                    dataMap['start_date'] = dataMap['start'];
                    dataMap.remove('start');
                  }
                  if (dataMap.containsKey('end')) {
                    dataMap['end_date'] = dataMap['end'];
                    dataMap.remove('end');
                  }

                  stragglers.add(Operation(
                    type: row['type'] as String,
                    data: dataMap,
                    timestamp: Hlc.parse(row['timestamp'] as String),
                    actorId: row['actor_id'] as String,
                  ));
                }
                idsToDelete.add(id);
              } catch (e) {
                print('Error decoding straggler $id: $e');
                idsToDelete.add(id);
              }
            }

            final combinedBatch = [...stragglers, ...opsToQueue];
            print('OfflineClient: Sending combined batch of ${combinedBatch.length} (Swallowed $count queued)');
            await _innerClient!.sendOperations(combinedBatch);

            if (idsToDelete.isNotEmpty) {
              final placeholder = List.filled(idsToDelete.length, '?').join(',');
              await _db.execute('DELETE FROM offline_queue WHERE id IN ($placeholder)', idsToDelete);
            }
            sentDirectly = true;
          }
        });
      } catch (e) {
        print('OfflineClient: Direct send failed, falling back to queue. Error: $e');
        sentDirectly = false;
      }
    }

    if (!sentDirectly) {
      await _queueOperations(opsToQueue);
      _flushQueue();
    }
  }
}