replicateTest function

List<dynamic Function(FoodbTestContext sourceCtx, FoodbTestContext targetCtx)> replicateTest()

Implementation

List<Function(FoodbTestContext sourceCtx, FoodbTestContext targetCtx)>
    replicateTest() {
  return [
    (FoodbTestContext sourceCtx, FoodbTestContext targetCtx) {
      test('replication with compaction and rev limit', () async {
        final source = await sourceCtx.db('source-compact');
        final target = await targetCtx.db('target-compact');
        onetimeReplicate([reverse = false]) async {
          var replication = Completer();
          if (reverse)
            replicate(target, source, onComplete: replication.complete);
          else
            replicate(source, target, onComplete: replication.complete);
          await replication.future;
        }

        putDoc(Foodb db, List<String> revs) {
          var revisions = Revisions(
            start: int.parse(revs[0].split('-')[0]),
            ids: revs.map((e) => e.split('-')[1]).toList(),
          );
          var rev = Rev.fromString(revs[0]);
          return db.put(
              doc: Doc(
                id: 'a',
                rev: rev,
                model: {},
                revisions: revisions,
              ),
              newEdits: false);
        }

        getDoc(Foodb db, [String? rev]) {
          return db.get(
            id: 'a',
            fromJsonT: (json) => json,
            rev: rev,
            revs: true,
            conflicts: true,
            meta: true,
          );
        }

        runCompact(Foodb db) async {
          await db.compact();
          await Future.delayed(Duration(seconds: 1));
        }

        var targetDoc;
        var sourceDoc;
        await putDoc(source, ['1-a']);
        await putDoc(source, ['2-a', '1-a']);
        await putDoc(source, ['2-b', '1-a']);
        await runCompact(source);
        await source.revsLimit(1);
        await runCompact(source);
        sourceDoc = await getDoc(source);
        await onetimeReplicate();
        targetDoc = await getDoc(target);

        print(sourceDoc);
      });
    },
    (FoodbTestContext sourceCtx, FoodbTestContext targetCtx) {
      test('repliacte resume on correct checkpoint', () async {
        final source =
            await sourceCtx.db('source-replicate-correct-checkpoint');
        final target =
            await targetCtx.db('target-replicate-correct-checkpoint');

        var complete = expectAsync0(() => {});
        var onSecondResult = expectAsync1((ChangeResult r) {}, count: 1);

        await source.put(doc: Doc(id: 'a', model: {}));
        replicate(source, target, onError: handleTestReplicationError,
            onComplete: () async {
          var docs = await target.allDocs(GetViewRequest(), (json) => json);
          expect(docs.totalRows, equals(1));
          await source.put(doc: Doc(id: 'b', model: {}));
          replicate(source, target, onResult: onSecondResult,
              onComplete: () async {
            var docs = await target.allDocs(GetViewRequest(), (json) => json);
            expect(docs.totalRows, equals(2));
            complete();
          });
        });
      });
      test('repliacte correct doc with revisions and deleted', () async {
        final source =
            await sourceCtx.db('replicate-source-revisions-and-deleted');
        final target =
            await targetCtx.db('replicate-target-revisions-and-deleted');

        var complete = expectAsync0(() => {});

        await source.put(
            doc: Doc(
                id: 'a',
                rev: Rev.fromString('2-b'),
                deleted: true,
                model: {},
                revisions: Revisions(start: 2, ids: ['b', 'a'])),
            newEdits: false);

        replicate(source, target, onComplete: () async {
          await expectLater(
              target.get(id: 'a', fromJsonT: (json) => json, revs: true),
              throwsA(predicate((e) =>
                  e is AdapterException && e.reason!.contains('deleted'))));
          var doc = await target.get(
              id: 'a', fromJsonT: (json) => json, rev: '2-b', revs: true);
          expect(doc, isNotNull);
          expect(doc.rev, Rev.fromString('2-b'));
          expect(doc.revisions!.start, 2);
          expect(doc.revisions!.ids, hasLength(2));
          complete();
        });
      });
      test('repliacte non-continuous, max-batch-size', () async {
        final source =
            await sourceCtx.db('source-replicate-source-max-batch-size');
        final target =
            await targetCtx.db('target-replicate-target-max-batch-size');

        var complete = expectAsync0(() => {});
        var checkpoint = expectAsync0(() => {}, count: 10);

        await source.bulkDocs(
            body: List.generate(100, (index) => Doc(id: '$index', model: {})));

        replicate(source, target, maxBatchSize: 10, onCheckpoint: (_) async {
          checkpoint();
        }, onComplete: () async {
          var docs = await target.allDocs(GetViewRequest(), (json) => json);
          expect(docs.rows, hasLength(100));
          complete();
        });
      });
      test(
          'repliacte continuous, long debouce, small maxBatchSize, will finish before debounce',
          () async {
        final source =
            await sourceCtx.db('source-continuous-by-max-batch-size');
        final target =
            await targetCtx.db('target-continuous-by-max-batch-size');

        var complete = expectAsync0(() => {});
        var processedCnt = 0;
        var stopwatch = Stopwatch();
        stopwatch.start();
        ReplicationStream? stream;
        stream = replicate(
          source,
          target,
          continuous: true,
          maxBatchSize: 10,
          debounce: Duration(seconds: 10),
          onCheckpoint: (checkpoint) async {
            processedCnt += checkpoint.processed.length;
            if (processedCnt == 30) {
              expect(stopwatch.elapsed.inSeconds, lessThan(10));
              complete();
            }
          },
        );
        Future.delayed(Duration(seconds: 1), () {
          source.bulkDocs(
              body: List.generate(30, (index) => Doc(id: '$index', model: {})));
        });
      });
      test(
          'repliacte continuous, short debouce, large maxBatchSize, will finish after debounce',
          () async {
        final source = await sourceCtx.db('source-continuous-by-debounce');
        final target = await targetCtx.db('target-continuous-by-debounce');

        var complete = expectAsync0(() => {});
        var processedCnt = 0;
        var stopwatch = Stopwatch();
        stopwatch.start();

        ReplicationStream? stream;
        stream = replicate(
          source,
          target,
          continuous: true,
          maxBatchSize: 50,
          debounce: Duration(seconds: 5),
          onCheckpoint: (checkpoint) async {
            processedCnt += checkpoint.processed.length;
            if (processedCnt == 30) {
              expect(stopwatch.elapsed.inSeconds, greaterThan(5));
              complete();
            }
          },
        );
        Future.delayed(Duration(seconds: 1), () {
          source.bulkDocs(
              body: List.generate(30, (index) => Doc(id: '$index', model: {})));
        });
      });
      test(
          'continuous replication, debounce will fire immediate if has initial change',
          () async {
        final source = await sourceCtx.db('source-tonituous-no-immediate-fire');
        final target = await targetCtx.db('target-tonituous-no-immediate-fire');
        var complete = expectAsync0(() => {}, count: 2);
        var checkpoint = expectAsync0(() => {}, count: 2);
        await source.put(doc: Doc(id: 'a', model: {}));

        ReplicationStream? stream;
        stream = replicate(
          source,
          target,
          continuous: true,
          debounce: Duration(milliseconds: 2000),
          onCheckpoint: (event) async {
            checkpoint();
            var doc = await target.get(id: 'a', fromJsonT: (json) => json);
            expect(doc, isNotNull);
            complete();
          },
        );
        Future.delayed(Duration(seconds: 1),
            () => source.put(doc: Doc(id: 'b', model: {})));
      });
      test('continuous replication, fast oepration', () async {
        final source = await sourceCtx.db('source-replicate-fast-operation');
        final target = await targetCtx.db('target-replicate-fast-operation');
        var complete = expectAsync0(() => {});
        var resultCnt = expectAsync1((r) => {}, count: 10);

        replicate(
          source,
          target,
          continuous: true,
          debounce: Duration(milliseconds: 1),
          onResult: resultCnt,
          onError: handleTestReplicationError,
        );
        var list = List.generate(10, (index) => index);
        for (var i in list) {
          await source.put(doc: Doc(id: '$i', model: {}));
        }
        await Future.delayed(Duration(seconds: 3), complete);
      });
    },
    (FoodbTestContext sourceCtx, FoodbTestContext targetCtx) {
      test('replication with client side id filter', () async {
        final source = await sourceCtx.db('source-replicate-id-filter');
        final target = await targetCtx.db('target-replicate-id-filter');
        var complete = expectAsync0(() => {});

        await source.put(doc: Doc(id: 'a_1', model: {}));
        await source.put(doc: Doc(id: 'a_2', model: {}));
        await source.put(doc: Doc(id: 'a_3', model: {}));
        await source.put(doc: Doc(id: 'b_1', model: {}));
        await source.put(doc: Doc(id: 'b_2', model: {}));
        await source.put(doc: Doc(id: 'c_1', model: {}));
        await source.put(doc: Doc(id: 'c_4', model: {}));

        replicate(source, target,
            onError: handleTestReplicationError,
            whereChange: WhereFunction('1', (change) {
              var splitted = change.id.split('_');
              return !['a', 'c'].contains(splitted[0]) ||
                  int.parse(splitted[1]) > 2;
            }), onComplete: () async {
          var docs = await target.allDocs(GetViewRequest(), (json) => json);
          expect(docs.totalRows, 4);
          ['a_3', 'b_1', 'b_2', 'c_4'].forEach((expectedId) {
            expect(docs.rows.where((element) => element.id == expectedId),
                hasLength(1));
          });
          complete();
        });
      });
    },
    (FoodbTestContext sourceCtx, FoodbTestContext targetCtx) {
      test('remove local doc will start from beginning', () async {
        final source = await sourceCtx.db('source-replicate-delete-local-doc');
        final target = await targetCtx.db('target-replicate-delete-local-doc');
        var replicationId = 'test';
        var firstReplicateCompleter = Completer();
        var complete1 =
            expectAsync0(() => {firstReplicateCompleter.complete()});
        var complete2 = expectAsync0(() => {});
        var onChange1 = expectAsync1((r) => {}, count: 3);
        var onChange2 = expectAsync1((r) => {}, count: 3);

        await source.put(doc: Doc(id: 'a_1', model: {}));
        await source.put(doc: Doc(id: 'a_2', model: {}));
        await source.put(doc: Doc(id: 'a_3', model: {}));

        replicate(
          source,
          target,
          replicationId: replicationId,
          onError: handleTestReplicationError,
          onResult: onChange1,
          onComplete: () async {
            complete1();
          },
        );
        await firstReplicateCompleter.future;
        var localDoc =
            await source.get(id: '_local/$replicationId', fromJsonT: (t) => t);
        await source.delete(id: localDoc.id, rev: localDoc.rev!);
        replicate(
          source,
          target,
          replicationId: replicationId,
          onResult: onChange2,
          onError: handleTestReplicationError,
          onComplete: () async {
            complete2();
          },
        );
      });
    }
  ];
}