bulk method
Bulk update of the index
.
In the following order:
updateDocs
will be updateddeleteDocs
will be deleted
Implementation
Future<bool> bulk({
List<Doc>? updateDocs,
List<Doc>? deleteDocs,
String? index,
String? type,
int batchSize = 100,
}) async {
// TODO: verify if docs.index is consistent with index.
final pathSegments = [
if (index != null) index,
if (type != null) type,
'_bulk',
];
var totalCount = 0;
var count = 0;
final sb = StringBuffer();
Future send([bool last = false]) async {
if (count == 0) return;
if (count >= batchSize || last) {
final rs = await _transport
.send(Request('POST', pathSegments, bodyText: sb.toString()));
rs.throwIfStatusNotOK(
message: 'Unable to update batch starting with $totalCount.');
// cheap test before parsing the body
if (rs.body.contains('"errors":true')) {
final body = convert.json.decode(rs.body) as Map<String, dynamic>;
if (body['errors'] == true) {
throw TransportException('Errors detected in the bulk updated.',
body: rs.body);
}
}
totalCount += count;
count = 0;
sb.clear();
}
}
for (final doc in updateDocs ?? const <Doc>[]) {
sb.writeln(convert.json.encode({
'index': {
if (doc.index != null) '_index': doc.index,
if (doc.type != null) '_type': doc.type,
'_id': doc.id,
}
}));
sb.writeln(convert.json.encode(doc.doc));
count++;
await send();
}
for (final doc in deleteDocs ?? const <Doc>[]) {
sb.writeln(convert.json.encode({
'delete': {
if (doc.index != null) '_index': doc.index,
if (doc.type != null) '_type': doc.type,
'_id': doc.id,
}
}));
count++;
await send();
}
await send(true);
return true;
}