data-access/nexustiles/backends/nexusproto/dao/ElasticsearchProxy.py (1,032 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import threading import time import re from datetime import datetime from pytz import timezone, UTC import requests import pysolr from shapely import wkt from elasticsearch import Elasticsearch ELASTICSEARCH_CON_LOCK = threading.Lock() thread_local = threading.local() EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) ELASTICSEARCH_FORMAT = '%Y-%m-%dT%H:%M:%SZ' ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' class ElasticsearchProxy(object): def __init__(self, config): self.elasticsearchHosts = config.get("elasticsearch", "host").split(',') self.elasticsearchIndex = config.get("elasticsearch", "index") self.elasticsearchUsername = config.get("elasticsearch", "username") self.elasticsearchPassword = config.get("elasticsearch", "password") self.logger = logging.getLogger(__name__) with ELASTICSEARCH_CON_LOCK: elasticsearchcon = getattr(thread_local, 'elasticsearchcon', None) if elasticsearchcon is None: elasticsearchcon = Elasticsearch(hosts=self.elasticsearchHosts, http_auth=(self.elasticsearchUsername, self.elasticsearchPassword)) thread_local.elasticsearchcon = elasticsearchcon self.elasticsearchcon = elasticsearchcon def find_tile_by_id(self, tile_id): params = { "size": 1, "query": { "term": { "id": { "value": tile_id } } } } results, _, hits = self.do_query(*(None, None, None, True, None), **params) assert hits == 1, f"Found {hits} results, expected exactly 1" return [results[0]["_source"]] def find_tiles_by_id(self, tile_ids, ds=None, **kwargs): params = { "query": { "bool": { "filter": [], "should": [], "minimum_should_match": 1 } } } for tile_id in tile_ids: params['query']['bool']['should'].append({"term": {"id": {"value": tile_id}}}) if ds is not None: params['query']['bool']['filter'].append({"term": {"dataset_s": {"value": ds}}}) self._merge_kwargs(params, **kwargs) results = self.do_query_all(*(None, None, None, False, None), **params) assert len(results) == len(tile_ids), "Found %s results, expected exactly %s" % (len(results), len(tile_ids)) return results def find_min_date_from_tiles(self, tile_ids, ds=None, **kwargs): params = { "size": 0, "query": { "bool": { "filter": [], "should": [] } }, "aggs": { "min_date_agg": { "min": { "field": "tile_min_time_dt" } } } } for tile_id in tile_ids: params['query']['bool']['should'].append({"term": {"id": {"value": tile_id}}}) if ds is not None: params['query']['bool']['filter'].append({"term": {"dataset_s": {"value": ds}}}) aggregations = self.do_aggregation(*(None, None, None, True, None), **params) return self.convert_iso_to_datetime(aggregations['min_date_agg']["value_as_string"]) def find_max_date_from_tiles(self, tile_ids, ds=None, **kwargs): params = { "size": 0, "query": { "bool": { "filter": [], "should": [] } }, "aggs": { "max_date_agg": { "max": { "field": "tile_max_time_dt" } } } } for tile_id in tile_ids: params['query']['bool']['should'].append({"term": {"id": {"value": tile_id}}}) if ds is not None: params['query']['bool']['filter'].append({"term": {"dataset_s": {"value": ds}}}) aggregations = self.do_aggregation(*(None, None, None, True, None), **params) return self.convert_iso_to_datetime(aggregations['max_date_agg']["value_as_string"]) def find_min_max_date_from_granule(self, ds, granule_name, **kwargs): params = { "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "term": { "granule_s": { "value": granule_name } } } ] } }, "aggs": { "min_date_agg": { "max": { "field": "tile_min_time_dt" } }, "max_date_agg": { "max": { "field": "tile_max_time_dt" } } } } self._merge_kwargs(params, **kwargs) aggregations = self.do_aggregation(*(None, None, None, False, None), **params) start_time = self.convert_iso_to_datetime(aggregations['min_date_agg']["value_as_string"]) end_time = self.convert_iso_to_datetime(aggregations['max_date_agg']["value_as_string"]) return start_time, end_time def get_data_series_list(self): datasets = self.get_data_series_list_simple() for dataset in datasets: min_date = self.find_min_date_from_tiles([], ds=dataset['title']) max_date = self.find_max_date_from_tiles([], ds=dataset['title']) dataset['start'] = (min_date - EPOCH).total_seconds() dataset['end'] = (max_date - EPOCH).total_seconds() dataset['iso_start'] = min_date.strftime(ISO_8601) dataset['iso_end'] = max_date.strftime(ISO_8601) return datasets def get_data_series_list_simple(self): params = { 'size': 0, "aggs": { "dataset_list_agg": { "composite": { "size":100, "sources": [ { "dataset_s": { "terms": { "field": "dataset_s" } } } ] } } } } aggregations = self.do_aggregation_all(params, 'dataset_list_agg') l = [] for dataset in aggregations: l.append({ "shortName": dataset['key']['dataset_s'], "title": dataset['key']['dataset_s'], "tileCount": dataset["doc_count"] }) l = sorted(l, key=lambda entry: entry["title"]) return l def get_data_series_stats(self, ds): params = { "size": 0, "query": { "term":{ "dataset_s": { "value": ds } } }, "aggs": { "available_dates": { "composite": { "size": 100, "sources": [ {"terms_tile_max_time_dt": {"terms": {"field": "tile_max_time_dt"}}} ] } } } } aggregations = self.do_aggregation_all(params, 'available_dates') stats = {} stats['available_dates'] = [] for dt in aggregations: stats['available_dates'].append(dt['key']['terms_tile_max_time_dt'] / 1000) stats['available_dates'] = sorted(stats['available_dates']) params = { "size": 0, "query": { "term":{ "dataset_s": { "value": ds } } }, "aggs": { "min_tile_min_val_d": { "min": { "field": "tile_min_val_d" } }, "min_tile_max_time_dt": { "min": { "field": "tile_max_time_dt" } }, "max_tile_max_time_dt": { "max": { "field": "tile_max_time_dt" } }, "max_tile_max_val_d": { "max": { "field": "tile_max_val_d" } } } } aggregations = self.do_aggregation(*(None, None, None, False, None), **params) stats["start"] = int(aggregations["min_tile_max_time_dt"]["value"]) / 1000 stats["end"] = int(aggregations["max_tile_max_time_dt"]["value"]) / 1000 stats["minValue"] = aggregations["min_tile_min_val_d"]["value"] stats["maxValue"] = aggregations["max_tile_max_val_d"]["value"] return stats # day_of_year_i added (SDAP-347) def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year): max_lat = bounding_polygon.bounds[3] min_lon = bounding_polygon.bounds[0] min_lat = bounding_polygon.bounds[1] max_lon = bounding_polygon.bounds[2] params = { "size": "1", "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat], [max_lon, min_lat]] }, "relation": "intersects" } } }, { "range": { "tile_count_i": { "gte": 1 } } }, { "range": { "day_of_year_i": { "lte": day_of_year } } } ] } } } result, _, _ = self.do_query(*(None, None, None, True, 'day_of_year_i desc'), **params) return [result[0]] def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs): search_start_s = datetime.utcfromtimestamp(start_time).strftime(ELASTICSEARCH_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(ELASTICSEARCH_FORMAT) params = { "size": "0", "_source": "tile_min_time_dt", "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "range": { "tile_min_time_dt": { "gte": search_start_s, "lte": search_end_s } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat],[max_lon, min_lat]] }, "relation": "intersects" } } } ] } }, "aggs": { "days_range_agg": { "composite": { "size":100, "sources": [ { "tile_min_time_dt": { "terms": { "field": "tile_min_time_dt" } } } ] } } } } aggregations = self.do_aggregation_all(params, 'days_range_agg') results = [res['key']['tile_min_time_dt'] for res in aggregations] daysinrangeasc = sorted([(res / 1000) for res in results]) return daysinrangeasc def find_all_tiles_in_box_sorttimeasc(self, min_lat, max_lat, min_lon, max_lon, ds, start_time=0, end_time=-1, **kwargs): params = { "size": 1000, "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat],[max_lon, min_lat]] }, "relation": "intersects" } } }, { "range": { "tile_count_i": { "gte": 1 } } } ] } } } if 0 < start_time <= end_time: params["query"]["bool"]["should"] = self.get_formatted_time_clause(start_time, end_time) params["query"]["bool"]["minimum_should_match"] = 1 self._merge_kwargs(params, **kwargs) return self.do_query_all(*(None, None, None, False, 'tile_min_time_dt asc,tile_max_time_dt asc'), **params) def find_all_tiles_in_polygon_sorttimeasc(self, bounding_polygon, ds, start_time=0, end_time=-1, **kwargs): nums = re.findall(r'\d+(?:\.\d*)?', bounding_polygon.wkt.rpartition(',')[0]) polygon_coordinates = list(zip(*[iter(nums)] * 2)) max_lat = bounding_polygon.bounds[3] min_lon = bounding_polygon.bounds[0] min_lat = bounding_polygon.bounds[1] max_lon = bounding_polygon.bounds[2] params = { "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat], [max_lon, min_lat]] }, "relation": "intersects" } } } ] } } } try: if 'fl' in list(kwargs.keys()): params["_source"] = kwargs["fl"].split(',') except KeyError: pass if 0 < start_time <= end_time: params["query"]["bool"]["should"] = self.get_formatted_time_clause(start_time, end_time) params["query"]["bool"]["minimum_should_match"] = 1 return self.do_query_all(*(None, None, None, False, 'tile_min_time_dt asc,tile_max_time_dt asc'), **params) def find_all_tiles_in_polygon(self, bounding_polygon, ds, start_time=0, end_time=-1, **kwargs): nums = re.findall(r'\d+(?:\.\d*)?', bounding_polygon.wkt.rpartition(',')[0]) polygon_coordinates = list(zip(*[iter(nums)] * 2)) max_lat = bounding_polygon.bounds[3] min_lon = bounding_polygon.bounds[0] min_lat = bounding_polygon.bounds[1] max_lon = bounding_polygon.bounds[2] params = { "size": 1000, "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat], [max_lon, min_lat]] }, "relation": "intersects" } } }, { "range": { "tile_count_i": { "gte": 1 } } } ] } } } try: if 'fl' in list(kwargs.keys()): params["_source"] = kwargs["fl"].split(',') except KeyError: pass if 0 < start_time <= end_time: params["query"]["bool"]["should"] = self.get_formatted_time_clause(start_time, end_time) params["query"]["bool"]["minimum_should_match"] = 1 self._merge_kwargs(params, **kwargs) return self.do_query_all(*(None, None, None, False, None), **params) def find_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time=0, end_time=-1, **kwargs): tile_max_lat = bounding_polygon.bounds[3] tile_min_lon = bounding_polygon.bounds[0] tile_min_lat = bounding_polygon.bounds[1] tile_max_lon = bounding_polygon.bounds[2] params = { "size": 0, "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[tile_min_lon, tile_max_lat], [tile_max_lon, tile_min_lat]] }, "relation": "intersects" } } } ] } }, "aggs": { "distinct_bounding_boxes": { "composite": { "size": 100, "sources": [ { "bounding_box": { "terms": { "script": { "source": "String.valueOf(doc['tile_min_lon'].value) + ', ' + String.valueOf(doc['tile_max_lon'].value) + ', ' + String.valueOf(doc['tile_min_lat'].value) + ', ' + String.valueOf(doc['tile_max_lat'].value)", "lang": "painless" } } } } ] } } } } if 0 < start_time <= end_time: params["query"]["bool"]["should"] = self.get_formatted_time_clause(start_time, end_time) params["query"]["bool"]["minimum_should_match"] = 1 self._merge_kwargs(params, **kwargs) aggregations = self.do_aggregation_all(params, 'distinct_bounding_boxes') distinct_bounds = [] for agg in aggregations: coords = agg['key']['bounding_box'].split(',') min_lon = round(float(coords[0]), 2) max_lon = round(float(coords[1]), 2) min_lat = round(float(coords[2]), 2) max_lat = round(float(coords[3]), 2) polygon = 'POLYGON((%s %s, %s %s, %s %s, %s %s, %s %s))' % (min_lon, max_lat, min_lon, min_lat, max_lon, min_lat, max_lon, max_lat, min_lon, max_lat) distinct_bounds.append(wkt.loads(polygon).bounds) return distinct_bounds def find_tiles_by_exact_bounds(self, minx, miny, maxx, maxy, ds, start_time=0, end_time=-1, **kwargs): params = { "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "term": { "tile_min_lon": { "value": minx } } }, { "term": { "tile_min_lat": { "value": miny } } }, { "term": { "tile_max_lon": { "value": maxx } } }, { "term": { "tile_max_lat": { "value": maxy } } } ] } }} if 0 < start_time <= end_time: params["query"]["bool"]["should"] = self.get_formatted_time_clause(start_time, end_time) params["query"]["bool"]["minimum_should_match"] = 1 self._merge_kwargs(params, **kwargs) return self.do_query_all(*(None, None, None, False, None), **params) def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, ds, search_time, **kwargs): the_time = datetime.utcfromtimestamp(search_time).strftime(ELASTICSEARCH_FORMAT) params = { "size": 1000, "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat],[max_lon, min_lat]] }, "relation": "intersects" } } }, { "range": { "tile_min_time_dt": { "lte": the_time } } }, { "range": { "tile_max_time_dt": { "gte": the_time } } } ] } } } self._merge_kwargs(params, **kwargs) return self.do_query_all(*(None, None, None, False, None), **params) def find_all_tiles_in_polygon_at_time(self, bounding_polygon, ds, search_time, **kwargs): the_time = datetime.utcfromtimestamp(search_time).strftime(ELASTICSEARCH_FORMAT) max_lat = bounding_polygon.bounds[3] min_lon = bounding_polygon.bounds[0] min_lat = bounding_polygon.bounds[1] max_lon = bounding_polygon.bounds[2] params = { "size": 1000, "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat],[max_lon, min_lat]] }, "relation": "intersects" } } }, { "range": { "tile_min_time_dt": { "lte": the_time } } }, { "range": { "tile_max_time_dt": { "gte": the_time } } } ] } } } self._merge_kwargs(params, **kwargs) return self.do_query_all(*(None, None, None, False, None), **params) def find_all_tiles_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, ds, time, **kwargs): the_time = datetime.utcfromtimestamp(time).strftime(ELASTICSEARCH_FORMAT) params = { "size": 1000, "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat],[max_lon, min_lat]] }, "relation": "within" } } }, { "range": { "tile_count_i": { "gte": 1 } } }, { "range": { "tile_min_time_dt": { "lte": the_time } } }, { "range": { "tile_max_time_dt": { "gte": the_time } } } ] } } } self._merge_kwargs(params, **kwargs) return self.do_query_all(*(None, "product(tile_avg_val_d, tile_count_i),*", None, False, None), **params) def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, ds, time, **kwargs): the_time = datetime.utcfromtimestamp(time).strftime(ELASTICSEARCH_FORMAT) params = { "size": 1000, "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "geo_shape": { "geo": { "shape": { "type": "multilinestring", "coordinates": [[[min_lon, max_lat], [max_lon, max_lat], [min_lon, max_lat], [min_lon, min_lat], [max_lon, max_lat], [max_lon, min_lat], [min_lon, min_lat], [max_lon, min_lat]]] }, "relation": "intersects" } } }, { "range": { "tile_count_i": { "gte": 1 } } }, { "range": { "tile_min_time_dt": { "lte": the_time } } }, { "range": { "tile_max_time_dt": { "gte": the_time } } } ], "must_not" : { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat], [max_lon, min_lat]] }, "relation": "within" } } } } } } self._merge_kwargs(params, **kwargs) return self.do_query_all(*(None, None, None, False, None), **params) def find_all_tiles_by_metadata(self, metadata, ds, start_time=0, end_time=-1, **kwargs): """ Get a list of tile metadata that matches the specified metadata, start_time, end_time. :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] :param ds: The dataset name to search :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :return: A list of tile metadata """ params = { "query": { "bool": { "must": [ { "term": { "dataset_s": {"value": ds} } } ] } } } if len(metadata) > 0: for key_value in metadata: key = key_value.split(':')[0] value = key_value.split(':')[1] params['query']['bool']['must'].append({"match": {key: value}}) if 0 < start_time <= end_time: params['query']['bool']['should'] = self.get_formatted_time_clause(start_time, end_time) params["query"]["bool"]["minimum_should_match"] = 1 self._merge_kwargs(params, **kwargs) return self.do_query_all(*(None, None, None, False, None), **params) def get_formatted_time_clause(self, start_time, end_time): search_start_s = datetime.utcfromtimestamp(start_time).strftime(ELASTICSEARCH_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(ELASTICSEARCH_FORMAT) time_clause = [ { "range": { "tile_min_time_dt": { "lte": search_end_s, "gte": search_start_s } } }, { "range": { "tile_max_time_dt": { "lte": search_end_s, "gte": search_start_s } } }, { "bool": { "must": [ { "range": { "tile_min_time_dt": { "gte": search_start_s } } }, { "range": { "tile_max_time_dt": { "lte": search_end_s } } } ] } } ] return time_clause def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs): """ Return number of tiles that match search criteria. :param ds: The dataset name to search :param bounding_polygon: The polygon to search for tiles :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] :return: number of tiles that match search criteria """ params = { "size": 0, "query": { "bool": { "filter": [ { "term": { "dataset_s": { "value": ds } } }, { "range": { "tile_count_i": { "gte": 1 } } } ] } } } if bounding_polygon: min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds geo_clause = { "geo_shape": { "geo": { "shape": { "type": "envelope", "coordinates": [[min_lon, max_lat], [max_lon, min_lat]] } } } } params['query']['bool']['filter'].append(geo_clause) if 0 < start_time <= end_time: params['query']['bool']['should'] = self.get_formatted_time_clause(start_time, end_time) params["query"]["bool"]["minimum_should_match"] = 1 if len(metadata) > 0: for key_value in metadata: key = key_value.split(':')[0] value = key_value.split(':')[1] params['query']['bool']['filter'].append({"term": {key: {"value": value}}}) self._merge_kwargs(params, **kwargs) _, _, found = self.do_query(*(None, None, None, True, None), **params) return found def do_aggregation(self, *args, **params): # Gets raw aggregations response = self.do_query_raw(*args, **params) aggregations = response.get('aggregations', None) return aggregations def do_aggregation_all(self, params, agg_name): # Used for pagination when results can exceed ES max size (use of after_key) with ELASTICSEARCH_CON_LOCK: response = self.elasticsearchcon.search(index=self.elasticsearchIndex, body=params) all_buckets = [] try: aggregations = response.get('aggregations', None) current_buckets = aggregations.get(agg_name, None) buckets = current_buckets.get('buckets', None) all_buckets += buckets after_bucket = current_buckets.get('after_key', None) while after_bucket is not None: for agg in params['aggs']: params['aggs'][agg]['composite']['after'] = {} for source in params['aggs'][agg]['composite']['sources']: key_name = next(iter(source)) params['aggs'][agg]['composite']['after'][key_name] = after_bucket[key_name] with ELASTICSEARCH_CON_LOCK: response = self.elasticsearchcon.search(index=self.elasticsearchIndex, body=params) aggregations = response.get('aggregations', None) current_buckets = aggregations.get(agg_name, None) buckets = current_buckets.get('buckets', None) all_buckets += buckets after_bucket = current_buckets.get('after_key', None) except AttributeError as e: self.logger.error('Error when accessing aggregation buckets - ' + str(e)) return all_buckets def do_query(self, *args, **params): response = self.do_query_raw(*args, **params) return response['hits']['hits'], None, response['hits']['total']['value'] def do_query_raw(self, *args, **params): if args[4]: sort_fields = args[4].split(",") if 'sort' not in list(params.keys()): params["sort"] = [] for field in sort_fields: field_order = field.split(' ') sort_instruction = {field_order[0]: field_order[1]} if sort_instruction not in params['sort']: params["sort"].append(sort_instruction) with ELASTICSEARCH_CON_LOCK: response = self.elasticsearchcon.search(index=self.elasticsearchIndex, body=params) return response def do_query_all(self, *args, **params): # Used to paginate with search_after. # The method calling this might already have a sort clause, # so we merge both sort clauses inside do_query_raw results = [] search = None # Add track option to not be blocked at 10000 hits per worker if 'track_total_hits' not in params.keys(): params['track_total_hits'] = True # Add sort instruction order to paginate the results : params["sort"] = [ { "tile_min_time_dt": "asc"}, { "_id": "asc" } ] response = self.do_query_raw(*args, **params) results.extend([r["_source"] for r in response["hits"]["hits"]]) total_hits = response["hits"]["total"]["value"] try: search_after = [] for sort_param in response["hits"]["hits"][-1]["sort"]: search_after.append(str(sort_param)) except (KeyError, IndexError): search_after = [] try: while len(results) < total_hits: params["search_after"] = search_after response = self.do_query_raw(*args, **params) results.extend([r["_source"] for r in response["hits"]["hits"]]) search_after = [] for sort_param in response["hits"]["hits"][-1]["sort"]: search_after.append(str(sort_param)) except (KeyError, IndexError): pass return results def convert_iso_to_datetime(self, date): return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=UTC) def convert_iso_to_timestamp(self, date): return (self.convert_iso_to_datetime(date) - EPOCH).total_seconds() @staticmethod def _merge_kwargs(params, **kwargs): # Only Solr-specific kwargs are parsed # And the special 'limit' try: params['limit'] = kwargs['limit'] except KeyError: pass try: params['_route_'] = kwargs['_route_'] except KeyError: pass try: params['size'] = kwargs['size'] except KeyError: pass try: params['start'] = kwargs['start'] except KeyError: pass try: s = kwargs['sort'] if isinstance(kwargs['sort'], list) else [kwargs['sort']] except KeyError: s = None try: params['sort'].extend(s) except KeyError: if s is not None: params['sort'] = s