pull method

  1. @override
Future<List<SyncRecord>> pull({
  1. required String collection,
  2. DateTime? since,
  3. int? limit,
  4. int? offset,
  5. SyncFilter? filter,
})
override

Pull data from backend with optional pagination and filtering

Parameters:

  • collection: Collection name to pull from
  • since: Only pull records modified after this timestamp
  • limit: Maximum number of records to pull
  • offset: Number of records to skip (for pagination)
  • filter: Optional sync filter for selective synchronization

Implementation

@override
Future<List<SyncRecord>> pull({
  required String collection,
  DateTime? since,
  int? limit,
  int? offset,
  SyncFilter? filter,
}) async {
  final indexKey = _getIndexKey(collection);
  final records = <SyncRecord>[];
  final effectiveSince = filter?.since ?? since;
  final minScore =
      effectiveSince?.millisecondsSinceEpoch.toString() ?? '-inf';
  final recordIds = await command.send_object([
    'ZRANGEBYSCORE',
    indexKey,
    minScore,
    '+inf',
  ]) as List;

  for (final recordId in recordIds) {
    final key = _getKey(collection, recordId.toString());
    final hash = await command.send_object(['HGETALL', key]) as List;
    if (hash.isEmpty) continue;

    final Map<String, String> fields = {};
    for (int i = 0; i < hash.length; i += 2) {
      fields[hash[i].toString()] = hash[i + 1].toString();
    }

    var recordData = jsonDecode(fields['data']!) as Map<String, dynamic>;
    if (filter != null) recordData = filter.applyFieldFilter(recordData);

    records.add(SyncRecord(
      recordId: recordId.toString(),
      data: recordData,
      updatedAt: DateTime.parse(fields['updated_at']!),
      version: int.parse(fields['version']!),
    ));
  }

  return records;
}