reconcile static method
Reconciles received message and creates response Returns (response bytes, need IDs, have IDs)
Implementation
static (Uint8List response, List<String> needIds, List<String> haveIds)
reconcile(Uint8List message, List<NegentropyItem> items) {
items.sort((a, b) {
final tsCmp = a.timestamp.compareTo(b.timestamp);
if (tsCmp != 0) return tsCmp;
return _compareBytes(a.id, b.id);
});
int offset = 0;
// Check version
if (message.isEmpty || message[0] != protocolVersion) {
throw ArgumentError('Invalid protocol version');
}
offset++;
final needIds = <String>[];
final haveIds = <String>[];
final output = BytesBuilder();
output.addByte(protocolVersion);
var prevBound = _Bound(0, Uint8List(0));
var itemIndex = 0;
while (offset < message.length) {
// Decode upper bound
final (timestamp, idPrefix, boundBytes) = decodeBound(message, offset);
offset += boundBytes;
final currBound = _Bound(timestamp, idPrefix);
// Get items in range [prevBound, currBound)
final rangeItems = <NegentropyItem>[];
while (itemIndex < items.length) {
final item = items[itemIndex];
if (_isInRange(item, prevBound, currBound)) {
rangeItems.add(item);
itemIndex++;
} else if (_isBeforeBound(item, currBound)) {
itemIndex++;
} else {
break;
}
}
// Decode mode
if (offset >= message.length) break;
final mode = message[offset++];
switch (mode) {
case modeSkip:
// Do nothing, this range is synchronized
break;
case modeFingerprint:
// Read fingerprint
if (offset + fingerprintSize > message.length) {
throw ArgumentError('Not enough data for fingerprint');
}
final theirFingerprint = Uint8List.fromList(
message.sublist(offset, offset + fingerprintSize));
offset += fingerprintSize;
// Calculate our fingerprint for this range
final ourIds = rangeItems.map((i) => i.id).toList();
final ourFingerprint = calculateFingerprint(ourIds);
if (!_bytesEqual(theirFingerprint, ourFingerprint)) {
// Mismatch - need to split or send IDs
if (rangeItems.length <= 2) {
// Send our IDs directly
output.add(encodeBound(timestamp, idPrefix));
output.addByte(modeIdList);
output.add(encodeVarint(rangeItems.length));
for (final item in rangeItems) {
output.add(item.id);
}
} else {
// Split range in half
final mid = rangeItems.length ~/ 2;
final midItem = rangeItems[mid];
// First half
output.add(encodeBound(midItem.timestamp, midItem.id));
output.addByte(modeFingerprint);
final firstHalfIds =
rangeItems.sublist(0, mid).map((i) => i.id).toList();
output.add(calculateFingerprint(firstHalfIds));
// Second half
output.add(encodeBound(timestamp, idPrefix));
output.addByte(modeFingerprint);
final secondHalfIds =
rangeItems.sublist(mid).map((i) => i.id).toList();
output.add(calculateFingerprint(secondHalfIds));
}
}
break;
case modeIdList:
// Read their IDs
final (count, countBytes) = decodeVarint(message, offset);
offset += countBytes;
final theirIds = <Uint8List>[];
for (var i = 0; i < count; i++) {
if (offset + idSize > message.length) {
throw ArgumentError('Not enough data for ID');
}
theirIds.add(
Uint8List.fromList(message.sublist(offset, offset + idSize)));
offset += idSize;
}
// Find differences
final ourIdSet = rangeItems.map((i) => bytesToHex(i.id)).toSet();
final theirIdSet = theirIds.map(bytesToHex).toSet();
// We need IDs they have that we don't
for (final theirId in theirIdSet) {
if (!ourIdSet.contains(theirId)) {
needIds.add(theirId);
}
}
// We have IDs that they don't
for (final ourId in ourIdSet) {
if (!theirIdSet.contains(ourId)) {
haveIds.add(ourId);
}
}
// Send skip for this range
output.add(encodeBound(timestamp, idPrefix));
output.addByte(modeSkip);
break;
default:
throw ArgumentError('Unknown mode: $mode');
}
prevBound = currBound;
}
return (output.toBytes(), needIds, haveIds);
}