replicate function

ReplicationStream replicate(
  1. Foodb source,
  2. Foodb target, {
  3. String? replicationId,
  4. bool createTarget = false,
  5. bool continuous = false,
  6. Duration debounce = const Duration(milliseconds: 30),
  7. int maxBatchSize = 25,
  8. int heartbeat = 10000,
  9. int timeout = 30000,
  10. WhereFunction<ChangeResult>? whereChange,
  11. void onError(
    1. Object?,
    2. StackTrace? stackTrace
    ) = defaultOnError,
  12. void onResult(
    1. ChangeResult
    )?,
  13. void onComplete()?,
  14. void onCheckpoint(
    1. ReplicationCheckpoint
    )?,
  15. String noCommonAncestry(
    1. Doc<ReplicationLog> source,
    2. Doc<ReplicationLog> target
    )?,
})

Implementation

ReplicationStream replicate(
  Foodb source,
  Foodb target, {
  /**
   * override the auto generated replicateId
   */
  String? replicationId,
  /**
   * create target database if not exist
   */
  bool createTarget = false,
  /**
   * run in continuous mode, the replication will be a long running process
   * however user are required to handle network error through onError callback
   */
  bool continuous = false,
  /**
   * used when continuous = true.
   * specify a certain millisecond before kick start the cycle
   */
  Duration debounce = const Duration(milliseconds: 30),
  /**
   * when continuous = true, when is size meet but replictor still in debounce, replication cycle will be triggered
   * when continuous = false, batchSize is used in seq_interval in ChangeRequest, let the couchdb decide the upperbound
   */
  int maxBatchSize = 25,
  /**
   * heartbeat for ChangeRequest
   */
  int heartbeat = 10000,
  /**
   * timeout for ChangeRequest
   */
  int timeout = 30000,
  /**
   * Client side run filter for each change result
   * this will prevent the whole document being fetched over the wire during bulkGet
   *
   * Version is required to determine the replication id, so that the replication stay consistance
   * Do update the version when changing the whereFn
   */
  WhereFunction<ChangeResult>? whereChange,
  /**
   * when counter error, can use the stream to decide retry or abort
   */
  void Function(Object?, StackTrace? stackTrace) onError = defaultOnError,
  /**
   * call when got a new change stream result
   */
  void Function(ChangeResult)? onResult,
  /**
   * call when a non-continuous replication completed
   */
  void Function()? onComplete,
  /**
   * call when completed a single checkpoint
   */
  void Function(ReplicationCheckpoint)? onCheckpoint,
  /**
   * call when source and target has unmatched replication log, should return the desire seq, use "0" if want to sync from beginning
   */
  String Function(Doc<ReplicationLog> source, Doc<ReplicationLog> target)?
      noCommonAncestry,
}) {
  late ReplicationStream resultStream;

  _onError(e, s) {
    resultStream.abort();
    onError(e, s);
  }

  FoodbDebug.timedStart('replication full');
  FoodbDebug.timedStart('replication checkpoint');

  _onComplete() {
    FoodbDebug.timedEnd(
        'replication full', (_) => '<replication>: replication complete');
    onComplete?.call();
  }

  runZonedGuarded(() async {
    late final _Replicator replicator;
    ChangesStream? changeStream;
    var timer = Timer(debounce, () {});
    replicator = _Replicator(source, target,
        maxBatchSize: maxBatchSize,
        whereChange: whereChange,
        onError: _onError);
    resultStream = new ReplicationStream(onCancel: () {
      timer.cancel();
      replicator.cancel();
      changeStream?.cancel();
    });
    // prepare target
    var sourceInstanceInfo = await source.serverInfo();
    GetServerInfoResponse targetInstanceInfo = await target.serverInfo();
    try {
      await target.info();
    } catch (err) {
      if (createTarget) {
        await target.initDb();
      } else {
        throw ReplicationException(err);
      }
    }

    replicationId ??= await _generateReplicationId(
        sourceUuid: sourceInstanceInfo.uuid,
        sourceUri: source.dbUri,
        targetUuid: targetInstanceInfo.uuid,
        targetUri: target.dbUri,
        createTarget: createTarget,
        continuous: continuous,
        filter: whereChange != null ? 'whereChange_${whereChange.id}' : null);

    // get first start seq
    var startSeq = '0';
    final initialSourceLog =
        await _retriveReplicationLog(source, replicationId);
    replicator.sourceLog = initialSourceLog;
    final initialTargetLog =
        await _retriveReplicationLog(target, replicationId);
    replicator.targetLog = initialTargetLog;
    if (initialSourceLog.model.sessionId == initialTargetLog.model.sessionId &&
        initialSourceLog.model.sessionId != "") {
      startSeq = initialTargetLog.model.sourceLastSeq;
    }
    for (final historyA in initialTargetLog.model.history) {
      if (initialSourceLog.model.history
          .any((historyB) => historyB.sessionId == historyA.sessionId)) {
        startSeq = historyA.recordedSeq;
        break;
      }
    }
    if (noCommonAncestry != null &&
        startSeq == "0" &&
        (initialSourceLog.model.sourceLastSeq != "0" ||
            initialTargetLog.model.sourceLastSeq != "0")) {
      startSeq = noCommonAncestry(initialSourceLog, initialTargetLog);
    }

    if (continuous) {
      replicator.onFinishCheckpoint = (checkpoint) async {
        onCheckpoint?.call(checkpoint);
        if (replicator.pendingList.isNotEmpty) {
          replicator.run();
        }
      };
      changeStream = await source.changesStream(
          ChangeRequest(
              feed: ChangeFeed.continuous,
              style: 'all_docs',
              heartbeat: heartbeat,
              timeout: timeout,
              since: startSeq), onResult: (result) async {
        onResult?.call(result);
        replicator.pendingList.add(result);
        timer.cancel();
        timer = Timer(debounce, replicator.run);
        if (replicator.pendingList.length == maxBatchSize) {
          timer.cancel();
          replicator.run();
        }
      }, onError: _onError);
    } else {
      var pending = 1;
      var startSeqWithLimit = startSeq;

      fetchNormalChangeAndRun() async {
        changeStream = await source.changesStream(
            ChangeRequest(
                feed: ChangeFeed.normal,
                style: 'all_docs',
                heartbeat: heartbeat,
                timeout: timeout,
                seqInterval: maxBatchSize - 1,
                limit: maxBatchSize,
                since: startSeqWithLimit),
            onResult: (result) => onResult?.call(result),
            onComplete: (result) async {
              pending = result.pending ?? 0;
              if (result.results.isNotEmpty) {
                result.results.last.seq = result.lastSeq!;
              }
              startSeqWithLimit = result.lastSeq!;
              replicator.pendingList.addAll(result.results);
              replicator.run();
            },
            onError: _onError);
      }

      replicator.onFinishCheckpoint = (checkpoint) async {
        onCheckpoint?.call(checkpoint);
        if (pending > 0) {
          fetchNormalChangeAndRun();
        } else {
          _onComplete();
        }
      };
      fetchNormalChangeAndRun();
    }
  }, _onError);
  return resultStream;
}