replicate function
ReplicationStream
replicate(
- Foodb source,
- Foodb target, {
- String? replicationId,
- bool createTarget = false,
- bool continuous = false,
- Duration debounce = const Duration(milliseconds: 30),
- int maxBatchSize = 25,
- int heartbeat = 10000,
- int timeout = 30000,
- WhereFunction<
ChangeResult> ? whereChange, - void onError(
- Object?,
- StackTrace? stackTrace
- void onResult()?,
- void onComplete()?,
- void onCheckpoint()?,
- String noCommonAncestry(
- Doc<
ReplicationLog> source, - Doc<
ReplicationLog> target
- Doc<
Implementation
ReplicationStream replicate(
Foodb source,
Foodb target, {
/**
* override the auto generated replicateId
*/
String? replicationId,
/**
* create target database if not exist
*/
bool createTarget = false,
/**
* run in continuous mode, the replication will be a long running process
* however user are required to handle network error through onError callback
*/
bool continuous = false,
/**
* used when continuous = true.
* specify a certain millisecond before kick start the cycle
*/
Duration debounce = const Duration(milliseconds: 30),
/**
* when continuous = true, when is size meet but replictor still in debounce, replication cycle will be triggered
* when continuous = false, batchSize is used in seq_interval in ChangeRequest, let the couchdb decide the upperbound
*/
int maxBatchSize = 25,
/**
* heartbeat for ChangeRequest
*/
int heartbeat = 10000,
/**
* timeout for ChangeRequest
*/
int timeout = 30000,
/**
* Client side run filter for each change result
* this will prevent the whole document being fetched over the wire during bulkGet
*
* Version is required to determine the replication id, so that the replication stay consistance
* Do update the version when changing the whereFn
*/
WhereFunction<ChangeResult>? whereChange,
/**
* when counter error, can use the stream to decide retry or abort
*/
void Function(Object?, StackTrace? stackTrace) onError = defaultOnError,
/**
* call when got a new change stream result
*/
void Function(ChangeResult)? onResult,
/**
* call when a non-continuous replication completed
*/
void Function()? onComplete,
/**
* call when completed a single checkpoint
*/
void Function(ReplicationCheckpoint)? onCheckpoint,
/**
* call when source and target has unmatched replication log, should return the desire seq, use "0" if want to sync from beginning
*/
String Function(Doc<ReplicationLog> source, Doc<ReplicationLog> target)?
noCommonAncestry,
}) {
late ReplicationStream resultStream;
_onError(e, s) {
resultStream.abort();
onError(e, s);
}
FoodbDebug.timedStart('replication full');
FoodbDebug.timedStart('replication checkpoint');
_onComplete() {
FoodbDebug.timedEnd(
'replication full', (_) => '<replication>: replication complete');
onComplete?.call();
}
runZonedGuarded(() async {
late final _Replicator replicator;
ChangesStream? changeStream;
var timer = Timer(debounce, () {});
replicator = _Replicator(source, target,
maxBatchSize: maxBatchSize,
whereChange: whereChange,
onError: _onError);
resultStream = new ReplicationStream(onCancel: () {
timer.cancel();
replicator.cancel();
changeStream?.cancel();
});
// prepare target
var sourceInstanceInfo = await source.serverInfo();
GetServerInfoResponse targetInstanceInfo = await target.serverInfo();
try {
await target.info();
} catch (err) {
if (createTarget) {
await target.initDb();
} else {
throw ReplicationException(err);
}
}
replicationId ??= await _generateReplicationId(
sourceUuid: sourceInstanceInfo.uuid,
sourceUri: source.dbUri,
targetUuid: targetInstanceInfo.uuid,
targetUri: target.dbUri,
createTarget: createTarget,
continuous: continuous,
filter: whereChange != null ? 'whereChange_${whereChange.id}' : null);
// get first start seq
var startSeq = '0';
final initialSourceLog =
await _retriveReplicationLog(source, replicationId);
replicator.sourceLog = initialSourceLog;
final initialTargetLog =
await _retriveReplicationLog(target, replicationId);
replicator.targetLog = initialTargetLog;
if (initialSourceLog.model.sessionId == initialTargetLog.model.sessionId &&
initialSourceLog.model.sessionId != "") {
startSeq = initialTargetLog.model.sourceLastSeq;
}
for (final historyA in initialTargetLog.model.history) {
if (initialSourceLog.model.history
.any((historyB) => historyB.sessionId == historyA.sessionId)) {
startSeq = historyA.recordedSeq;
break;
}
}
if (noCommonAncestry != null &&
startSeq == "0" &&
(initialSourceLog.model.sourceLastSeq != "0" ||
initialTargetLog.model.sourceLastSeq != "0")) {
startSeq = noCommonAncestry(initialSourceLog, initialTargetLog);
}
if (continuous) {
replicator.onFinishCheckpoint = (checkpoint) async {
onCheckpoint?.call(checkpoint);
if (replicator.pendingList.isNotEmpty) {
replicator.run();
}
};
changeStream = await source.changesStream(
ChangeRequest(
feed: ChangeFeed.continuous,
style: 'all_docs',
heartbeat: heartbeat,
timeout: timeout,
since: startSeq), onResult: (result) async {
onResult?.call(result);
replicator.pendingList.add(result);
timer.cancel();
timer = Timer(debounce, replicator.run);
if (replicator.pendingList.length == maxBatchSize) {
timer.cancel();
replicator.run();
}
}, onError: _onError);
} else {
var pending = 1;
var startSeqWithLimit = startSeq;
fetchNormalChangeAndRun() async {
changeStream = await source.changesStream(
ChangeRequest(
feed: ChangeFeed.normal,
style: 'all_docs',
heartbeat: heartbeat,
timeout: timeout,
seqInterval: maxBatchSize - 1,
limit: maxBatchSize,
since: startSeqWithLimit),
onResult: (result) => onResult?.call(result),
onComplete: (result) async {
pending = result.pending ?? 0;
if (result.results.isNotEmpty) {
result.results.last.seq = result.lastSeq!;
}
startSeqWithLimit = result.lastSeq!;
replicator.pendingList.addAll(result.results);
replicator.run();
},
onError: _onError);
}
replicator.onFinishCheckpoint = (checkpoint) async {
onCheckpoint?.call(checkpoint);
if (pending > 0) {
fetchNormalChangeAndRun();
} else {
_onComplete();
}
};
fetchNormalChangeAndRun();
}
}, _onError);
return resultStream;
}