bulk method

Future<bool> bulk({
  1. List<Doc>? updateDocs,
  2. List<Doc>? deleteDocs,
  3. String? index,
  4. String? type,
  5. int batchSize = 100,
})

Bulk update of the index.

In the following order:

  • updateDocs will be updated
  • deleteDocs will be deleted

Implementation

Future<bool> bulk({
  List<Doc>? updateDocs,
  List<Doc>? deleteDocs,
  String? index,
  String? type,
  int batchSize = 100,
}) async {
  // TODO: verify if docs.index is consistent with index.
  final pathSegments = [
    if (index != null) index,
    if (type != null) type,
    '_bulk',
  ];
  var totalCount = 0;
  var count = 0;
  final sb = StringBuffer();
  Future send([bool last = false]) async {
    if (count == 0) return;
    if (count >= batchSize || last) {
      final rs = await _transport
          .send(Request('POST', pathSegments, bodyText: sb.toString()));
      rs.throwIfStatusNotOK(
          message: 'Unable to update batch starting with $totalCount.');

      // cheap test before parsing the body
      if (rs.body.contains('"errors":true')) {
        final body = convert.json.decode(rs.body) as Map<String, dynamic>;
        if (body['errors'] == true) {
          throw TransportException('Errors detected in the bulk updated.',
              body: rs.body);
        }
      }

      totalCount += count;
      count = 0;
      sb.clear();
    }
  }

  for (final doc in updateDocs ?? const <Doc>[]) {
    sb.writeln(convert.json.encode({
      'index': {
        if (doc.index != null) '_index': doc.index,
        if (doc.type != null) '_type': doc.type,
        '_id': doc.id,
      }
    }));
    sb.writeln(convert.json.encode(doc.doc));
    count++;
    await send();
  }

  for (final doc in deleteDocs ?? const <Doc>[]) {
    sb.writeln(convert.json.encode({
      'delete': {
        if (doc.index != null) '_index': doc.index,
        if (doc.type != null) '_type': doc.type,
        '_id': doc.id,
      }
    }));
    count++;
    await send();
  }

  await send(true);
  return true;
}