data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py (615 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import threading import time from datetime import datetime from pytz import timezone, UTC import requests import pysolr from shapely import wkt SOLR_CON_LOCK = threading.Lock() thread_local = threading.local() EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) SOLR_FORMAT = '%Y-%m-%dT%H:%M:%SZ' ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' class SolrProxy(object): def __init__(self, config): self.solrUrl = config.get("solr", "host") self.solrCore = config.get("solr", "core") solr_kargs = {} if config.has_option("solr", "time_out"): solr_kargs["timeout"] = config.get("solr", "time_out") self.logger = logging.getLogger('nexus') with SOLR_CON_LOCK: solrcon = getattr(thread_local, 'solrcon', None) if solrcon is None: solr_url = '%s/solr/%s' % (self.solrUrl, self.solrCore) self.logger.info("connect to solr, url {} with option(s) = {}".format(solr_url, solr_kargs)) solrcon = pysolr.Solr(solr_url, **solr_kargs) thread_local.solrcon = solrcon self.solrcon = solrcon def find_tile_by_id(self, tile_id): search = 'id:%s' % tile_id params = { 'rows': 1 } results, start, found = self.do_query(*(search, None, None, True, None), **params) assert len(results) == 1, "Found %s results, expected exactly 1" % len(results) return [results[0]] def find_tiles_by_id(self, tile_ids, ds=None, **kwargs): if ds is not None: search = 'dataset_s:%s' % ds else: search = '*:*' additionalparams = { 'fq': [ "{!terms f=id}%s" % ','.join(tile_ids) ] } self._merge_kwargs(additionalparams, **kwargs) results = self.do_query_all(*(search, None, None, False, None), **additionalparams) assert len(results) == len(tile_ids), "Found %s results, expected exactly %s" % (len(results), len(tile_ids)) return results def find_min_date_from_tiles(self, tile_ids, ds=None, **kwargs): if ds is not None: search = 'dataset_s:%s' % ds else: search = '*:*' kwargs['rows'] = 1 kwargs['fl'] = 'tile_min_time_dt' kwargs['sort'] = ['tile_min_time_dt asc'] additionalparams = { 'fq': [ "{!terms f=id}%s" % ','.join(tile_ids) if len(tile_ids) > 0 else '' ] } self._merge_kwargs(additionalparams, **kwargs) results, start, found = self.do_query(*(search, None, None, True, None), **additionalparams) return self.convert_iso_to_datetime(results[0]['tile_min_time_dt']) def find_max_date_from_tiles(self, tile_ids, ds=None, **kwargs): if ds is not None: search = 'dataset_s:%s' % ds else: search = '*:*' kwargs['rows'] = 1 kwargs['fl'] = 'tile_max_time_dt' kwargs['sort'] = ['tile_max_time_dt desc'] additionalparams = { 'fq': [ "{!terms f=id}%s" % ','.join(tile_ids) if len(tile_ids) > 0 else '' ] } self._merge_kwargs(additionalparams, **kwargs) results, start, found = self.do_query(*(search, None, None, True, None), **additionalparams) return self.convert_iso_to_datetime(results[0]['tile_max_time_dt']) def find_min_max_date_from_granule(self, ds, granule_name, **kwargs): search = 'dataset_s:%s' % ds kwargs['rows'] = 1 kwargs['fl'] = 'tile_min_time_dt' kwargs['sort'] = ['tile_min_time_dt asc'] additionalparams = { 'fq': [ "granule_s:%s" % granule_name ] } self._merge_kwargs(additionalparams, **kwargs) results, start, found = self.do_query(*(search, None, None, False, None), **additionalparams) start_time = self.convert_iso_to_datetime(results[0]['tile_min_time_dt']) kwargs['fl'] = 'tile_max_time_dt' kwargs['sort'] = ['tile_max_time_dt desc'] additionalparams = { 'fq': [ "granule_s:%s" % granule_name ] } self._merge_kwargs(additionalparams, **kwargs) results, start, found = self.do_query(*(search, None, None, False, None), **additionalparams) end_time = self.convert_iso_to_datetime(results[0]['tile_max_time_dt']) return start_time, end_time def get_data_series_list(self): datasets = self.get_data_series_list_simple() def get_spatial_bound(dataset, lat_lon, min_max): search = f'dataset_s:{dataset}' field = f'tile_{min_max}_{lat_lon}' order = 'ASC' if min_max == 'min' else 'DESC' params = dict( rows=1, sort=[f'{field} {order}'] ) results = self.do_query_raw(*(search, None, None, False, None), **params) v = results.docs[0][field] if isinstance(v, list): v = v[0] return v for dataset in datasets: min_date = self.find_min_date_from_tiles([], ds=dataset['title']) max_date = self.find_max_date_from_tiles([], ds=dataset['title']) dataset['start'] = (min_date - EPOCH).total_seconds() dataset['end'] = (max_date - EPOCH).total_seconds() dataset['iso_start'] = min_date.strftime(ISO_8601) dataset['iso_end'] = max_date.strftime(ISO_8601) min_lat = get_spatial_bound(dataset['title'], 'lat', 'min') max_lat = get_spatial_bound(dataset['title'], 'lat', 'max') min_lon = get_spatial_bound(dataset['title'], 'lon', 'min') max_lon = get_spatial_bound(dataset['title'], 'lon', 'max') dataset['spatial_extent'] = '{:0.2f},{:0.2f},{:0.2f},{:0.2f}'.format(min_lon, min_lat, max_lon, max_lat) return datasets def get_data_series_list_simple(self): search = "*:*" params = { 'rows': 0, "facet": "true", "facet.field": "dataset_s", "facet.mincount": "1", "facet.limit": "-1" } response = self.do_query_raw(*(search, None, None, False, None), **params) l = [] for g, v in zip(*[iter(response.facets["facet_fields"]["dataset_s"])]*2): l.append({ "shortName": g, "title": g, "tileCount": v, "type": 'nexusproto' }) l = sorted(l, key=lambda entry: entry["title"]) return l def get_data_series_stats(self, ds): search = "dataset_s:%s" % ds params = { "facet": "true", "facet.field": ["dataset_s", "tile_max_time_dt"], "facet.limit": "-1", "facet.mincount": "1", "facet.pivot": "{!stats=piv1}dataset_s", "stats": "on", "stats.field": ["{!tag=piv1 min=true max=true sum=false}tile_max_time_dt","{!tag=piv1 min=true max=false sum=false}tile_min_val_d","{!tag=piv1 min=false max=true sum=false}tile_max_val_d"] } response = self.do_query_raw(*(search, None, None, False, None), **params) stats = {} for g in response.facets["facet_pivot"]["dataset_s"]: if g["value"] == ds: stats["start"] = self.convert_iso_to_timestamp(g["stats"]["stats_fields"]["tile_max_time_dt"]["min"]) stats["end"] = self.convert_iso_to_timestamp(g["stats"]["stats_fields"]["tile_max_time_dt"]["max"]) stats["minValue"] = g["stats"]["stats_fields"]["tile_min_val_d"]["min"] stats["maxValue"] = g["stats"]["stats_fields"]["tile_max_val_d"]["max"] stats["availableDates"] = [] for dt in response.facets["facet_fields"]["tile_max_time_dt"][::2]: stats["availableDates"].append(self.convert_iso_to_timestamp(dt)) stats["availableDates"] = sorted(stats["availableDates"]) return stats def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year): search = 'dataset_s:%s' % ds min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds params = { 'fq': [ "geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon), "tile_count_i:[1 TO *]", "day_of_year_i:[* TO %s]" % day_of_year ], 'rows': 1 } results, start, found = self.do_query( *(search, None, None, True, ('day_of_year_i desc',)), **params) return [results[0]] def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs): search = 'dataset_s:%s' % ds search_start_s = datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT) additionalparams = { 'fq': [ "geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon), "{!frange l=0 u=0}ms(tile_min_time_dt,tile_max_time_dt)", "tile_count_i:[1 TO *]", "tile_min_time_dt:[%s TO %s] " % (search_start_s, search_end_s) ], 'rows': 0, 'facet': 'true', 'facet.field': 'tile_min_time_dt', 'facet.mincount': '1', 'facet.limit': '-1' } self._merge_kwargs(additionalparams, **kwargs) response = self.do_query_raw(*(search, None, None, False, None), **additionalparams) daysinrangeasc = sorted( [(datetime.strptime(a_date, SOLR_FORMAT) - datetime.utcfromtimestamp(0)).total_seconds() for a_date in response.facets['facet_fields']['tile_min_time_dt'][::2]]) return daysinrangeasc def find_all_tiles_in_box_sorttimeasc(self, min_lat, max_lat, min_lon, max_lon, ds, start_time=0, end_time=-1, **kwargs): search = 'dataset_s:%s' % ds additionalparams = { 'fq': [ "geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon), "tile_count_i:[1 TO *]" ] } if 0 <= start_time <= end_time: search_start_s = datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[%s TO %s] " \ "OR tile_max_time_dt:[%s TO %s] " \ "OR (tile_min_time_dt:[* TO %s] AND tile_max_time_dt:[%s TO *])" \ ")" % ( search_start_s, search_end_s, search_start_s, search_end_s, search_start_s, search_end_s ) additionalparams['fq'].append(time_clause) min_elevation = kwargs.get('min_elevation', None) max_elevation = kwargs.get('max_elevation', None) elevation_clause = self.get_elevation_clause(min_elevation, max_elevation) additionalparams['fq'].append(elevation_clause) self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all( *(search, None, None, False, 'tile_min_time_dt asc, tile_max_time_dt asc'), **additionalparams) def find_all_tiles_in_polygon_sorttimeasc(self, bounding_polygon, ds, start_time=0, end_time=-1, **kwargs): search = 'dataset_s:%s' % ds min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds additionalparams = { 'fq': [ "geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon), "tile_count_i:[1 TO *]" ] } if 0 <= start_time <= end_time: search_start_s = datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[%s TO %s] " \ "OR tile_max_time_dt:[%s TO %s] " \ "OR (tile_min_time_dt:[* TO %s] AND tile_max_time_dt:[%s TO *])" \ ")" % ( search_start_s, search_end_s, search_start_s, search_end_s, search_start_s, search_end_s ) additionalparams['fq'].append(time_clause) min_elevation = kwargs['min_elevation'] if 'min_elevation' in kwargs else None max_elevation = kwargs['max_elevation'] if 'max_elevation' in kwargs else None elevation_clause = self.get_elevation_clause(min_elevation, max_elevation) additionalparams['fq'].append(elevation_clause) self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all( *(search, None, None, False, 'tile_min_time_dt asc, tile_max_time_dt asc'), **additionalparams) def find_all_tiles_in_polygon(self, bounding_polygon, ds, start_time=0, end_time=-1, **kwargs): search = 'dataset_s:%s' % ds min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds additionalparams = { 'fq': [ "geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon), "tile_count_i:[1 TO *]" ] } if 0 <= start_time <= end_time: search_start_s = datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[%s TO %s] " \ "OR tile_max_time_dt:[%s TO %s] " \ "OR (tile_min_time_dt:[* TO %s] AND tile_max_time_dt:[%s TO *])" \ ")" % ( search_start_s, search_end_s, search_start_s, search_end_s, search_start_s, search_end_s ) additionalparams['fq'].append(time_clause) min_elevation = kwargs['min_elevation'] if 'min_elevation' in kwargs else None max_elevation = kwargs['max_elevation'] if 'max_elevation' in kwargs else None elevation_clause = self.get_elevation_clause(min_elevation, max_elevation) additionalparams['fq'].append(elevation_clause) self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all( *(search, None, None, False, None), **additionalparams) def find_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time=0, end_time=-1, **kwargs): search = 'dataset_s:%s' % ds min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds additionalparams = { 'fq': [ "geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon), "tile_count_i:[1 TO *]" ], 'rows': 0, 'facet': 'true', 'facet.field': 'geo_s', 'facet.limit': -1, 'facet.mincount': 1 } if 0 <= start_time <= end_time: search_start_s = datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[%s TO %s] " \ "OR tile_max_time_dt:[%s TO %s] " \ "OR (tile_min_time_dt:[* TO %s] AND tile_max_time_dt:[%s TO *])" \ ")" % ( search_start_s, search_end_s, search_start_s, search_end_s, search_start_s, search_end_s ) additionalparams['fq'].append(time_clause) self._merge_kwargs(additionalparams, **kwargs) response = self.do_query_raw(*(search, None, None, False, None), **additionalparams) distinct_bounds = [wkt.loads(key).bounds for key in response.facets["facet_fields"]["geo_s"][::2]] return distinct_bounds def find_tiles_by_exact_bounds(self, minx, miny, maxx, maxy, ds, start_time=0, end_time=-1, **kwargs): search = 'dataset_s:%s' % ds additionalparams = { 'fq': [ "tile_min_lon:\"%s\"" % minx, "tile_min_lat:\"%s\"" % miny, "tile_max_lon:\"%s\"" % maxx, "tile_max_lat:\"%s\"" % maxy, "tile_count_i:[1 TO *]" ] } if 0 <= start_time <= end_time: search_start_s = datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[%s TO %s] " \ "OR tile_max_time_dt:[%s TO %s] " \ "OR (tile_min_time_dt:[* TO %s] AND tile_max_time_dt:[%s TO *])" \ ")" % ( search_start_s, search_end_s, search_start_s, search_end_s, search_start_s, search_end_s ) additionalparams['fq'].append(time_clause) self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all( *(search, None, None, False, None), **additionalparams) def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, ds, search_time, **kwargs): search = 'dataset_s:%s' % ds the_time = datetime.utcfromtimestamp(search_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[* TO %s] " \ "AND tile_max_time_dt:[%s TO *] " \ ")" % ( the_time, the_time ) additionalparams = { 'fq': [ "geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon), "tile_count_i:[1 TO *]", time_clause ] } self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all(*(search, None, None, False, None), **additionalparams) def find_all_tiles_in_polygon_at_time(self, bounding_polygon, ds, search_time, **kwargs): search = 'dataset_s:%s' % ds the_time = datetime.utcfromtimestamp(search_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[* TO %s] " \ "AND tile_max_time_dt:[%s TO *] " \ ")" % ( the_time, the_time ) min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds additionalparams = { 'fq': [ "geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon), "tile_count_i:[1 TO *]", time_clause ] } self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all(*(search, None, None, False, None), **additionalparams) def find_all_tiles_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, ds, time, **kwargs): search = 'dataset_s:%s' % ds the_time = datetime.utcfromtimestamp(time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[* TO %s] " \ "AND tile_max_time_dt:[%s TO *] " \ ")" % ( the_time, the_time ) additionalparams = { 'fq': [ "geo:\"Within(ENVELOPE(%s,%s,%s,%s))\"" % (min_lon, max_lon, max_lat, min_lat), "tile_count_i:[1 TO *]", time_clause ] } self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all(*(search, "product(tile_avg_val_d, tile_count_i),*", None, False, None), **additionalparams) def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, ds, time, **kwargs): search = 'dataset_s:%s' % ds the_time = datetime.utcfromtimestamp(time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[* TO %s] " \ "AND tile_max_time_dt:[%s TO *] " \ ")" % ( the_time, the_time ) additionalparams = { 'fq': [ "geo:\"Intersects(MultiLineString((%s %s, %s %s),(%s %s, %s %s),(%s %s, %s %s),(%s %s, %s %s)))\"" % ( min_lon, max_lat, max_lon, max_lat, min_lon, max_lat, min_lon, min_lat, max_lon, max_lat, max_lon, min_lat, min_lon, min_lat, max_lon, min_lat), "-geo:\"Within(ENVELOPE(%s,%s,%s,%s))\"" % (min_lon, max_lon, max_lat, min_lat), "tile_count_i:[1 TO *]", time_clause ] } self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all(*(search, None, None, False, None), **additionalparams) def find_tiles_along_line(self, start_point, end_point, ds=None, start_time=0, end_time=-1, **kwargs): if ds is None: ds = '*' search = 'dataset_s:%s' % ds additionalparams = { 'fq': [ "geo:\"Intersects(LineString(%s %s, %s %s))\"" % ( start_point.x, start_point.y, end_point.x, end_point.y), "tile_count_i:[1 TO *]" ] } if 0 <= start_time <= end_time: search_start_s = datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[%s TO %s] " \ "OR tile_max_time_dt:[%s TO %s] " \ "OR (tile_min_time_dt:[* TO %s] AND tile_max_time_dt:[%s TO *])" \ ")" % ( search_start_s, search_end_s, search_start_s, search_end_s, search_start_s, search_end_s ) additionalparams['fq'].append(time_clause) min_elevation = kwargs.get('min_elevation', None) max_elevation = kwargs.get('max_elevation', None) elevation_clause = self.get_elevation_clause(min_elevation, max_elevation) additionalparams['fq'].append(elevation_clause) self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all( *(search, None, None, False, 'tile_min_time_dt asc, tile_max_time_dt asc'), **additionalparams) def find_all_tiles_by_metadata(self, metadata, ds, start_time=0, end_time=-1, **kwargs): """ Get a list of tile metadata that matches the specified metadata, start_time, end_time. :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] :param ds: The dataset name to search :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :return: A list of tile metadata """ search = 'dataset_s:%s' % ds additionalparams = { 'fq': metadata } if 0 <= start_time <= end_time: additionalparams['fq'].append(self.get_formatted_time_clause(start_time, end_time)) self._merge_kwargs(additionalparams, **kwargs) return self.do_query_all( *(search, None, None, False, None), **additionalparams) def get_formatted_time_clause(self, start_time, end_time): search_start_s = datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT) search_end_s = datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT) time_clause = "(" \ "tile_min_time_dt:[%s TO %s] " \ "OR tile_max_time_dt:[%s TO %s] " \ "OR (tile_min_time_dt:[* TO %s] AND tile_max_time_dt:[%s TO *])" \ ")" % ( search_start_s, search_end_s, search_start_s, search_end_s, search_start_s, search_end_s ) return time_clause def get_elevation_clause(self, min_elevation, max_elevation): if min_elevation is not None and max_elevation is not None: if min_elevation == max_elevation: elevation_clause = f"(tile_min_elevation_d:[* TO {min_elevation}] AND tile_max_elevation_d:[{max_elevation} TO *])" else: elevation_clause = (f"(tile_min_elevation_d:[{min_elevation} TO {max_elevation}] OR " f"tile_max_elevation_d:[{min_elevation} TO {max_elevation}] OR " f"(tile_min_elevation_d:[* TO {min_elevation}] AND tile_max_elevation_d:[{max_elevation} TO *]))") elif min_elevation is not None: elevation_clause = (f"(tile_min_elevation_d:[{min_elevation} TO *] AND " f"tile_max_elevation_d:[{min_elevation} TO *])") elif max_elevation is not None: elevation_clause = (f"(tile_min_elevation_d:[* TO {max_elevation}] AND " f"tile_max_elevation_d:[* TO {max_elevation}])") else: elevation_clause = (f"((*:* -tile_min_elevation_d:[* TO *]) OR " f"(tile_min_elevation_d:[0 TO 0] OR tile_max_elevation_d:[0 TO 0]))") return elevation_clause def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs): """ Return number of tiles that match search criteria. :param ds: The dataset name to search :param bounding_polygon: The polygon to search for tiles :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] :return: number of tiles that match search criteria """ search = 'dataset_s:%s' % ds additionalparams = { 'fq': [ "tile_count_i:[1 TO *]" ], 'rows': 0 } if bounding_polygon: min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds additionalparams['fq'].append("geo:[%s,%s TO %s,%s]" % (min_lat, min_lon, max_lat, max_lon)) if 0 <= start_time <= end_time: additionalparams['fq'].append(self.get_formatted_time_clause(start_time, end_time)) if metadata: additionalparams['fq'].extend(metadata) self._merge_kwargs(additionalparams, **kwargs) results, start, found = self.do_query(*(search, None, None, True, None), **additionalparams) return found def do_query(self, *args, **params): response = self.do_query_raw(*args, **params) return response.docs, response.raw_response['response']['start'], response.hits def do_query_raw(self, *args, **params): if 'fl' not in list(params.keys()) and args[1]: params['fl'] = args[1] if 'sort' not in list(params.keys()) and args[4]: params['sort'] = args[4] # If dataset_s is specified as the search term, # add the _route_ parameter to limit the search to the correct shard if 'dataset_s:' in args[0]: ds = args[0].split(':')[-1] params['shard_keys'] = ds + '!' with SOLR_CON_LOCK: response = self.solrcon.search(args[0], **params) return response def do_query_all(self, *args, **params): results = [] response = self.do_query_raw(*args, **params) results.extend(response.docs) limit = min(params.get('limit', float('inf')), response.hits) while len(results) < limit: params['start'] = len(results) response = self.do_query_raw(*args, **params) results.extend(response.docs) assert len(results) == limit return results def convert_iso_to_datetime(self, date): return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) def convert_iso_to_timestamp(self, date): return (self.convert_iso_to_datetime(date) - EPOCH).total_seconds() def ping(self): solrAdminPing = '%s/solr/%s/admin/ping' % (self.solrUrl, self.solrCore) try: r = requests.get(solrAdminPing, params={'wt': 'json'}) results = json.loads(r.text) return results except: return None @staticmethod def _merge_kwargs(additionalparams, **kwargs): # Only Solr-specific kwargs are parsed # And the special 'limit' try: additionalparams['limit'] = kwargs['limit'] except KeyError: pass try: additionalparams['_route_'] = kwargs['_route_'] except KeyError: pass try: additionalparams['rows'] = kwargs['rows'] except KeyError: pass try: additionalparams['start'] = kwargs['start'] except KeyError: pass try: kwfq = kwargs['fq'] if isinstance(kwargs['fq'], list) else list(kwargs['fq']) except KeyError: kwfq = [] try: additionalparams['fq'].extend(kwfq) except KeyError: additionalparams['fq'] = kwfq try: kwfl = kwargs['fl'] if isinstance(kwargs['fl'], list) else [kwargs['fl']] except KeyError: kwfl = [] try: additionalparams['fl'].extend(kwfl) except KeyError: additionalparams['fl'] = kwfl try: s = kwargs['sort'] if isinstance(kwargs['sort'], list) else [kwargs['sort']] except KeyError: s = None try: additionalparams['sort'].extend(s) except KeyError: if s is not None: additionalparams['sort'] = s