sendOperations method
Sends multiple operations to the server/peers efficiently.
Implementation
@override
Future<void> sendOperations(List<Operation> operations) async {
const transientOps = {'CURSOR_MOVE', 'GHOST_UPDATE', 'PRESENCE_UPDATE'};
final opsToQueue = <Operation>[];
for (final op in operations) {
if (transientOps.contains(op.type)) {
if (_innerClient != null && _isConnected) {
await _innerClient!.sendOperation(op);
} else {
print('OfflineClient: Dropping transient operation ${op.type} because offline');
}
} else {
opsToQueue.add(op);
}
}
if (opsToQueue.isNotEmpty) {
if (kIsWeb && opsToQueue.length > 100 && (!_isConnected || _innerClient == null)) {
print('OfflineClient: Large batch on Web ($opsToQueue.length ops) and not connected. Waiting up to 2s...');
try {
await _connectionStateController.stream
.firstWhere((isConnected) => isConnected)
.timeout(const Duration(seconds: 2));
print('OfflineClient: Connected after wait! Proceeding to try direct send.');
} catch (_) {
print('OfflineClient: Connection wait timed out. Falling back to queue.');
}
}
bool sentDirectly = false;
if (_isConnected && _innerClient != null && _isDbReady) {
try {
await _lock.synchronized(() async {
final countResult = await _db.query('SELECT COUNT(*) FROM offline_queue WHERE is_deleted = 0');
final count =
countResult.firstOrNull?['COUNT(*)'] as int? ?? countResult.firstOrNull?.values.first as int? ?? 0;
if (count == 0) {
print('OfflineClient: Queue is empty, sending ${opsToQueue.length} ops directly bypass DB.');
await _innerClient!.sendOperations(opsToQueue);
sentDirectly = true;
} else if (count < 100) {
print('OfflineClient: Queue has $count items (small). Draining and bundling...');
final rows = await _db.query('SELECT * FROM offline_queue WHERE is_deleted = 0 ORDER BY id ASC');
final idsToDelete = <int>[];
final stragglers = <Operation>[];
for (final row in rows) {
final id = row['id'] as int;
try {
final dataDynamic = await decodeJsonInBackground(row['data'] as String);
if (dataDynamic != null) {
final Map<String, dynamic> dataMap = Map<String, dynamic>.from(dataDynamic as Map);
if (dataMap.containsKey('start')) {
dataMap['start_date'] = dataMap['start'];
dataMap.remove('start');
}
if (dataMap.containsKey('end')) {
dataMap['end_date'] = dataMap['end'];
dataMap.remove('end');
}
stragglers.add(Operation(
type: row['type'] as String,
data: dataMap,
timestamp: Hlc.parse(row['timestamp'] as String),
actorId: row['actor_id'] as String,
));
}
idsToDelete.add(id);
} catch (e) {
print('Error decoding straggler $id: $e');
idsToDelete.add(id);
}
}
final combinedBatch = [...stragglers, ...opsToQueue];
print('OfflineClient: Sending combined batch of ${combinedBatch.length} (Swallowed $count queued)');
await _innerClient!.sendOperations(combinedBatch);
if (idsToDelete.isNotEmpty) {
final placeholder = List.filled(idsToDelete.length, '?').join(',');
await _db.execute('DELETE FROM offline_queue WHERE id IN ($placeholder)', idsToDelete);
}
sentDirectly = true;
}
});
} catch (e) {
print('OfflineClient: Direct send failed, falling back to queue. Error: $e');
sentDirectly = false;
}
}
if (!sentDirectly) {
await _queueOperations(opsToQueue);
_flushQueue();
}
}
}