reconcile static method

(Uint8List, List<String>, List<String>) reconcile(
  1. Uint8List message,
  2. List<NegentropyItem> items
)

Reconciles received message and creates response Returns (response bytes, need IDs, have IDs)

Implementation

static (Uint8List response, List<String> needIds, List<String> haveIds)
    reconcile(Uint8List message, List<NegentropyItem> items) {
  items.sort((a, b) {
    final tsCmp = a.timestamp.compareTo(b.timestamp);
    if (tsCmp != 0) return tsCmp;
    return _compareBytes(a.id, b.id);
  });

  int offset = 0;

  // Check version
  if (message.isEmpty || message[0] != protocolVersion) {
    throw ArgumentError('Invalid protocol version');
  }
  offset++;

  final needIds = <String>[];
  final haveIds = <String>[];
  final output = BytesBuilder();
  output.addByte(protocolVersion);

  var prevBound = _Bound(0, Uint8List(0));
  var itemIndex = 0;

  while (offset < message.length) {
    // Decode upper bound
    final (timestamp, idPrefix, boundBytes) = decodeBound(message, offset);
    offset += boundBytes;

    final currBound = _Bound(timestamp, idPrefix);

    // Get items in range [prevBound, currBound)
    final rangeItems = <NegentropyItem>[];
    while (itemIndex < items.length) {
      final item = items[itemIndex];
      if (_isInRange(item, prevBound, currBound)) {
        rangeItems.add(item);
        itemIndex++;
      } else if (_isBeforeBound(item, currBound)) {
        itemIndex++;
      } else {
        break;
      }
    }

    // Decode mode
    if (offset >= message.length) break;
    final mode = message[offset++];

    switch (mode) {
      case modeSkip:
        // Do nothing, this range is synchronized
        break;

      case modeFingerprint:
        // Read fingerprint
        if (offset + fingerprintSize > message.length) {
          throw ArgumentError('Not enough data for fingerprint');
        }
        final theirFingerprint = Uint8List.fromList(
            message.sublist(offset, offset + fingerprintSize));
        offset += fingerprintSize;

        // Calculate our fingerprint for this range
        final ourIds = rangeItems.map((i) => i.id).toList();
        final ourFingerprint = calculateFingerprint(ourIds);

        if (!_bytesEqual(theirFingerprint, ourFingerprint)) {
          // Mismatch - need to split or send IDs
          if (rangeItems.length <= 2) {
            // Send our IDs directly
            output.add(encodeBound(timestamp, idPrefix));
            output.addByte(modeIdList);
            output.add(encodeVarint(rangeItems.length));
            for (final item in rangeItems) {
              output.add(item.id);
            }
          } else {
            // Split range in half
            final mid = rangeItems.length ~/ 2;
            final midItem = rangeItems[mid];

            // First half
            output.add(encodeBound(midItem.timestamp, midItem.id));
            output.addByte(modeFingerprint);
            final firstHalfIds =
                rangeItems.sublist(0, mid).map((i) => i.id).toList();
            output.add(calculateFingerprint(firstHalfIds));

            // Second half
            output.add(encodeBound(timestamp, idPrefix));
            output.addByte(modeFingerprint);
            final secondHalfIds =
                rangeItems.sublist(mid).map((i) => i.id).toList();
            output.add(calculateFingerprint(secondHalfIds));
          }
        }
        break;

      case modeIdList:
        // Read their IDs
        final (count, countBytes) = decodeVarint(message, offset);
        offset += countBytes;

        final theirIds = <Uint8List>[];
        for (var i = 0; i < count; i++) {
          if (offset + idSize > message.length) {
            throw ArgumentError('Not enough data for ID');
          }
          theirIds.add(
              Uint8List.fromList(message.sublist(offset, offset + idSize)));
          offset += idSize;
        }

        // Find differences
        final ourIdSet = rangeItems.map((i) => bytesToHex(i.id)).toSet();
        final theirIdSet = theirIds.map(bytesToHex).toSet();

        // We need IDs they have that we don't
        for (final theirId in theirIdSet) {
          if (!ourIdSet.contains(theirId)) {
            needIds.add(theirId);
          }
        }

        // We have IDs that they don't
        for (final ourId in ourIdSet) {
          if (!theirIdSet.contains(ourId)) {
            haveIds.add(ourId);
          }
        }

        // Send skip for this range
        output.add(encodeBound(timestamp, idPrefix));
        output.addByte(modeSkip);
        break;

      default:
        throw ArgumentError('Unknown mode: $mode');
    }

    prevBound = currBound;
  }

  return (output.toBytes(), needIds, haveIds);
}