performDocumentSearch method

  1. @override
Stream<QueryResult> performDocumentSearch(
  1. DocumentSearchRequest request,
  2. {bool autoCreateIndex}
)
override

Implementation

@override
Stream<QueryResult> performDocumentSearch(DocumentSearchRequest request,
    {bool autoCreateIndex}) async* {
  final collection = request.collection;
  final schema = request.outputSchema ?? const ArbitraryTreeSchema();

  //
  // Validate collection ID
  //
  final collectionId = _validateCollectionId(collection.collectionId);

  //
  // Construct request
  //
  final jsonRequest = <String, Object>{};

  //
  // Filter
  //
  final query = request.query;
  final filter = query.filter;
  if (filter != null) {
    jsonRequest['query'] = {
      'query_string': {
        'query': filter.toString(),
      },
    };
  }

  //
  // Sort
  //

  final sorter = query.sorter;
  if (sorter != null) {
    final jsonSorters = [];
    if (sorter is PropertySorter) {
      jsonSorters.add({sorter.name: sorter.isDescending ? 'desc' : 'asc'});
    } else if (sorter is MultiSorter) {
      for (var item in sorter.sorters) {
        if (item is PropertySorter) {
          jsonSorters.add({item.name: item.isDescending ? 'desc' : 'asc'});
        } else {
          throw UnsupportedError('Unsupported sorter: $item');
        }
      }
    } else {
      throw UnsupportedError('Unsupported sorter: $sorter');
    }
    jsonRequest['sort'] = jsonSorters;
  }

  //
  // Skip
  //
  {
    final skip = query.skip;
    if (skip != null && skip != 0) {
      jsonRequest['from'] = skip;
    }
  }

  //
  // Take
  //
  {
    final take = query.take;
    if (take != null) {
      jsonRequest['size'] = take;
    }
  }

  //
  // Send HTTP request
  //
  final response = await _httpRequest(
    'POST',
    '/$collectionId/_search',
    json: jsonRequest,
  );

  //
  // Handle error
  //
  final error = response.error;
  if (error != null) {
    switch (error.type) {
      case 'index_not_found_exception':
        yield (QueryResult(
          collection: collection,
          query: query,
          snapshots: const <Snapshot>[],
          count: 0,
        ));
        return;
    }
    throw error;
  }

  switch (response.status) {
    case HttpStatus.ok:
      break;

    default:
      throw DatabaseException.internal(
        message: 'Got HTTP status: ${response.status}',
      );
  }

  var items = const <QueryResultItem>[];
  final jsonHitsMap = response.body['hits'];
  if (jsonHitsMap is Map) {
    // This map contains information about hits

    // The following list contains actual hits
    final jsonHitsList = jsonHitsMap['hits'] as List;
    items = jsonHitsList.map((h) {
      final documentId = h['_id'] as String;
      final score = h['_score'] as double;
      final data = h['_source'] as Map<String, Object>;
      final decoder = JsonDecoder(database: collection.database);
      return QueryResultItem(
        snapshot: Snapshot(
          document: collection.document(documentId),
          versionId: h['_seq_no']?.toString(),
          data: schema.decodeWith(decoder, data),
        ),
        score: score,
      );
    }).toList();
  }

  yield (QueryResult.withDetails(
    collection: collection,
    query: query,
    items: items,
  ));
}