data-access/nexustiles/nexustiles.py (638 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import configparser import json import logging import sys import threading from datetime import datetime from functools import reduce, wraps from time import sleep from typing import Dict, Union import numpy as np import numpy.ma as ma import pkg_resources import pysolr from pytz import timezone, UTC from shapely.geometry import box from webservice.webmodel import DatasetNotFoundException, NexusProcessingException from webservice.NexusHandler import nexus_initializer from yarl import URL from .AbstractTileService import AbstractTileService from .backends.nexusproto.backend import NexusprotoTileService from .backends.zarr.backend import ZarrBackend from .model.nexusmodel import Tile, BBox, TileStats, TileVariable from .exception import NexusTileServiceException from requests.structures import CaseInsensitiveDict EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout) logger = logging.getLogger("nexus-tile-svc") def tile_data(default_fetch=True): def tile_data_decorator(func): @wraps(func) def fetch_data_for_func(*args, **kwargs): metadatastore_start = datetime.now() metadatastore_docs = func(*args, **kwargs) metadatastore_duration = (datetime.now() - metadatastore_start).total_seconds() # Try to determine source dataset to route calls to proper backend guessed_dataset = None if 'ds' in kwargs: guessed_dataset = kwargs['ds'] elif 'dataset' in kwargs: guessed_dataset = kwargs['dataset'] else: for arg in args: if isinstance(arg, str) and arg in NexusTileService.backends: guessed_dataset = arg break tiles = NexusTileService._get_backend(guessed_dataset)._metadata_store_docs_to_tiles(*metadatastore_docs) cassandra_duration = 0 if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch): if len(tiles) > 0: cassandra_start = datetime.now() NexusTileService._get_backend(guessed_dataset).fetch_data_for_tiles(*tiles) cassandra_duration += (datetime.now() - cassandra_start).total_seconds() if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None: try: kwargs['metrics_callback'](cassandra=cassandra_duration, metadatastore=metadatastore_duration, num_tiles=len(tiles)) except Exception as e: logger.error("Metrics callback '{}'raised an exception. Will continue anyway. " + "The exception was: {}".format(kwargs['metrics_callback'], e)) return tiles return fetch_data_for_func return tile_data_decorator def catch_not_implemented(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except NotImplementedError: raise NexusTileServiceException('Action unsupported by backend') return wrapper SOLR_LOCK = threading.Lock() thread_local = threading.local() @nexus_initializer class NTSInitializer: def __init__(self): self._log = logger.getChild('init') def init(self, config): self._log.info('*** RUNNING NTS INITIALIZATION ***') NexusTileService(config) class NexusTileService: backends: Dict[Union[None, str], Dict[str, Union[AbstractTileService, bool]]] = {} ds_config = None DS_LOCK = threading.Lock() __update_thread = None @staticmethod def __update_datasets_loop(): while True: with NexusTileService.DS_LOCK: NexusTileService._update_datasets() sleep(3600) def __init__(self, config=None): self._config = configparser.RawConfigParser() self._config.read(NexusTileService._get_config_files('config/datasets.ini')) self._alg_config = config if not NexusTileService.backends: NexusTileService.ds_config = configparser.RawConfigParser() NexusTileService.ds_config.read(NexusTileService._get_config_files('config/datasets.ini')) default_backend = {"backend": NexusprotoTileService(False, False, config), 'up': True} NexusTileService.backends[None] = default_backend NexusTileService.backends['__nexusproto__'] = default_backend if config: self.override_config(config) if not NexusTileService.__update_thread: NexusTileService.__update_thread = threading.Thread( target=NexusTileService.__update_datasets_loop, name='dataset_update', daemon=True ) logger.info('Starting dataset refresh thread') NexusTileService.__update_thread.start() @staticmethod def is_update_thread_alive() -> bool: return NexusTileService.__update_thread is not None and NexusTileService.__update_thread.is_alive() @staticmethod def _get_backend(dataset_s) -> AbstractTileService: if dataset_s is not None: dataset_s = dataset_s with NexusTileService.DS_LOCK: if dataset_s not in NexusTileService.backends: logger.warning(f'Dataset {dataset_s} not currently loaded. Checking to see if it was recently' f'added') NexusTileService._update_datasets() if dataset_s not in NexusTileService.backends: raise DatasetNotFoundException(reason=f'Dataset {dataset_s} is not currently loaded/ingested') b = NexusTileService.backends[dataset_s] return b['backend'] @staticmethod def _get_datasets_store(): solr_url = NexusTileService.ds_config.get("solr", "host") solr_core = NexusTileService.ds_config.get("solr", "core") solr_kwargs = {} if NexusTileService.ds_config.has_option("solr", "time_out"): solr_kwargs["timeout"] = NexusTileService.ds_config.get("solr", "time_out") with SOLR_LOCK: solrcon = getattr(thread_local, 'solrcon', None) if solrcon is None: solr_url = '%s/solr/%s' % (solr_url, solr_core) solrcon = pysolr.Solr(solr_url, **solr_kwargs) thread_local.solrcon = solrcon solrcon = solrcon return solrcon @staticmethod def _update_datasets(): update_logger = logging.getLogger("nexus-tile-svc.backends") solrcon = NexusTileService._get_datasets_store() update_logger.info('Executing Solr query to check for new datasets') present_datasets = {None, '__nexusproto__'} next_cursor_mark = '*' added_datasets = 0 while True: response = solrcon.search('*:*', cursorMark=next_cursor_mark, sort='id asc') try: response_cursor_mark = response.nextCursorMark except AttributeError: break if response_cursor_mark == next_cursor_mark: break else: next_cursor_mark = response_cursor_mark for dataset in response.docs: d_id = dataset['dataset_s'] store_type = dataset.get('store_type_s', 'nexusproto') present_datasets.add(d_id) if d_id in NexusTileService.backends: continue added_datasets += 1 if store_type == 'nexus_proto' or store_type == 'nexusproto': update_logger.info(f"Detected new nexusproto dataset {d_id}, using default nexusproto backend") NexusTileService.backends[d_id] = NexusTileService.backends[None] elif store_type == 'zarr': update_logger.info(f"Detected new zarr dataset {d_id}, opening new zarr backend") ds_config = json.loads(dataset['config'][0]) try: NexusTileService.backends[d_id] = { 'backend': ZarrBackend(dataset_name=dataset['dataset_s'], **ds_config), 'up': True } except NexusTileServiceException: added_datasets -= 1 else: update_logger.warning(f'Unsupported backend {store_type} for dataset {d_id}') added_datasets -= 1 removed_datasets = set(NexusTileService.backends.keys()).difference(present_datasets) if len(removed_datasets) > 0: update_logger.info(f'{len(removed_datasets)} old datasets marked for removal') for dataset in removed_datasets: update_logger.info(f"Removing dataset {dataset}") del NexusTileService.backends[dataset] update_logger.info(f'Finished dataset update: {added_datasets} added, {len(removed_datasets)} removed, ' f'{len(NexusTileService.backends) - 2} total') # Update cfg (ie, creds) of dataset @staticmethod def user_ds_update(name, config): solr = NexusTileService._get_datasets_store() docs = solr.search(f'dataset_s:{name}').docs if len(docs) != 1: raise ValueError(f'Given name must match exactly one existing dataset; matched {len(docs)}') ds = docs[0] if 'source_s' not in ds or ds['source_s'] == 'collection_config': raise ValueError('Provided dataset is source_s in collection config and cannot be deleted') config_dict = json.loads(ds['config'][0]) config_dict['config'] = config solr.delete(id=ds['id']) solr.add([{ 'id': name, 'dataset_s': name, 'latest_update_l': int(datetime.now().timestamp()), 'store_type_s': ds['store_type_s'], 'config': json.dumps(config_dict), 'source_s': 'user_added' }]) solr.commit() logger.info(f'Updated dataset {name} in Solr. Updating backends') with NexusTileService.DS_LOCK: NexusTileService._update_datasets() return {'success': True} # Add dataset + backend @staticmethod def user_ds_add(name, path, config, type='zarr'): solr = NexusTileService._get_datasets_store() docs = solr.search(f'dataset_s:{name}').docs if len(docs) > 0: raise ValueError(f'Dataset {name} already exists') config_dict = { 'path': path, 'config': config } solr.add([{ 'id': name, 'dataset_s': name, 'latest_update_l': int(datetime.now().timestamp()), 'store_type_s': type, 'config': json.dumps(config_dict), 'source_s': 'user_added' }]) solr.commit() logger.info(f'Added dataset {name} to Solr. Updating backends') with NexusTileService.DS_LOCK: NexusTileService._update_datasets() return {'success': True} # Delete dataset backend (error if it's a hardcoded one) @staticmethod def user_ds_delete(name): solr = NexusTileService._get_datasets_store() docs = solr.search(f'dataset_s:{name}').docs if len(docs) != 1: raise ValueError(f'Given name must match exactly one existing dataset; matched {len(docs)}') ds = docs[0] if 'source_s' not in ds or ds['source_s'] == 'collection_config': raise ValueError('Provided dataset is source_s in collection config and cannot be deleted') solr.delete(id=ds['id']) solr.commit() logger.info(f'Removed dataset {name} from Solr. Updating backends') with NexusTileService.DS_LOCK: NexusTileService._update_datasets() return {'success': True} def override_config(self, config): for section in config.sections(): if self._config.has_section(section): # only override preexisting section, ignores the other for option in config.options(section): if config.get(section, option) is not None: self._config.set(section, option, config.get(section, option)) if NexusTileService.ds_config.has_section(section): # only override preexisting section, ignores the other for option in config.options(section): if config.get(section, option) is not None: NexusTileService.ds_config.set(section, option, config.get(section, option)) def get_dataseries_list(self, simple=False): datasets = [] for backend in set([b['backend'] for b in NexusTileService.backends.values() if b['up']]): datasets.extend(backend.get_dataseries_list(simple)) return datasets def heartbeat(self) -> Dict[str, bool]: heartbeats = {'nexusproto': NexusTileService.backends[None]['backend'].heartbeat()} for backend_name in NexusTileService.backends: if backend_name in [None, '__nexusproto__']: continue backend = NexusTileService.backends[backend_name]['backend'] if backend == NexusTileService.backends[None]['backend']: continue heartbeats[backend_name] = backend.heartbeat() return heartbeats @tile_data() @catch_not_implemented def find_tile_by_id(self, tile_id, **kwargs): tile = URL(tile_id) if tile.scheme == 'nts': return NexusTileService._get_backend(tile.path).find_tile_by_id(tile_id) else: return NexusTileService._get_backend('__nexusproto__').find_tile_by_id(tile_id) @tile_data() @catch_not_implemented def find_tiles_by_id(self, tile_ids, ds=None, **kwargs): if ds is None: return [self.find_tile_by_id(tid, **kwargs, fetch_data=False) for tid in tile_ids] return NexusTileService._get_backend(ds).find_tiles_by_id(tile_ids, ds=ds, **kwargs) @catch_not_implemented def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, metrics_callback=None, **kwargs): return NexusTileService._get_backend(dataset).find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, metrics_callback, **kwargs) @tile_data() @catch_not_implemented def find_tile_by_polygon_and_most_recent_day_of_year(self, bounding_polygon, ds, day_of_year, **kwargs): return NexusTileService._get_backend(ds).find_tile_by_polygon_and_most_recent_day_of_year( bounding_polygon, ds, day_of_year, **kwargs ) @tile_data() @catch_not_implemented def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): return NexusTileService._get_backend(dataset).find_all_tiles_in_box_at_time( min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs ) @tile_data() @catch_not_implemented def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs): return NexusTileService._get_backend(dataset).find_all_tiles_in_polygon_at_time( bounding_polygon, dataset, time, **kwargs ) @tile_data() @catch_not_implemented def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): # Find tiles that fall in the given box in the Solr index if type(start_time) is datetime: start_time = (start_time - EPOCH).total_seconds() if type(end_time) is datetime: end_time = (end_time - EPOCH).total_seconds() return NexusTileService._get_backend(ds).find_tiles_in_box( min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs ) @tile_data() @catch_not_implemented def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs): return NexusTileService._get_backend(ds).find_tiles_in_polygon( bounding_polygon, ds, start_time, end_time, **kwargs ) @tile_data() @catch_not_implemented def find_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs): return NexusTileService._get_backend(ds).find_tiles_by_metadata( metadata, ds, start_time, end_time, **kwargs ) def get_tiles_by_metadata(self, metadata, ds=None, start_time=0, end_time=-1, **kwargs): """ Return list of tiles that matches the specified metadata, start_time, end_time with tile data outside of time range properly masked out. :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] :param ds: The dataset name to search :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :return: A list of tiles """ tiles = self.find_tiles_by_metadata(metadata, ds, start_time, end_time, **kwargs) tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles) return tiles @tile_data() @catch_not_implemented def find_tiles_by_exact_bounds(self, bounds, ds, start_time, end_time, **kwargs): """ The method will return tiles with the exact given bounds within the time range. It differs from find_tiles_in_polygon in that only tiles with exactly the given bounds will be returned as opposed to doing a polygon intersection with the given bounds. :param bounds: (minx, miny, maxx, maxy) bounds to search for :param ds: Dataset name to search :param start_time: Start time to search (seconds since epoch) :param end_time: End time to search (seconds since epoch) :param kwargs: fetch_data: True/False = whether or not to retrieve tile data :return: """ return NexusTileService._get_backend(ds).find_tiles_by_exact_bounds( bounds, ds, start_time, end_time, **kwargs ) @tile_data() @catch_not_implemented def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): return NexusTileService._get_backend(dataset).find_all_boundary_tiles_at_time( min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs ) @tile_data() @catch_not_implemented def find_tiles_along_line(self, start_point, end_point, ds=None, start_time=0, end_time=-1, **kwargs): # Find tiles that fall in the given box in the Solr index if type(start_time) is datetime: start_time = (start_time - EPOCH).total_seconds() if type(end_time) is datetime: end_time = (end_time - EPOCH).total_seconds() return NexusTileService._get_backend(ds).find_tiles_along_line(start_point, end_point, ds, start_time, end_time, **kwargs) def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): tiles = self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs) tiles = self.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, tiles) if 0 <= start_time <= end_time: tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles) if 'min_elevation' in kwargs or 'max_elevation' in kwargs: tiles = self.mask_tiles_to_elevation( kwargs.get('min_elevation'), kwargs.get('max_elevation'), tiles ) return tiles def get_tiles_bounded_by_polygon(self, polygon, ds=None, start_time=0, end_time=-1, **kwargs): tiles = self.find_tiles_in_polygon(polygon, ds, start_time, end_time, **kwargs) tiles = self.mask_tiles_to_polygon(polygon, tiles) if 0 <= start_time <= end_time: tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles) if 'min_elevation' in kwargs or 'max_elevation' in kwargs: tiles = self.mask_tiles_to_elevation( kwargs.get('min_elevation'), kwargs.get('max_elevation'), tiles ) return tiles @catch_not_implemented def get_min_max_time_by_granule(self, ds, granule_name): return NexusTileService._get_backend(ds).get_min_max_time_by_granule( ds, granule_name ) @catch_not_implemented def get_dataset_overall_stats(self, ds): return NexusTileService._get_backend(ds).get_dataset_overall_stats(ds) def get_tiles_bounded_by_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): tiles = self.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs) tiles = self.mask_tiles_to_bbox_and_time(min_lat, max_lat, min_lon, max_lon, time, time, tiles) if 'min_elevation' in kwargs or 'max_elevation' in kwargs: tiles = self.mask_tiles_to_elevation( kwargs.get('min_elevation'), kwargs.get('max_elevation'), tiles ) return tiles def get_tiles_bounded_by_polygon_at_time(self, polygon, dataset, time, **kwargs): tiles = self.find_all_tiles_in_polygon_at_time(polygon, dataset, time, **kwargs) tiles = self.mask_tiles_to_polygon_and_time(polygon, time, time, tiles) if 'min_elevation' in kwargs or 'max_elevation' in kwargs: tiles = self.mask_tiles_to_elevation( kwargs.get('min_elevation'), kwargs.get('max_elevation'), tiles ) return tiles def get_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): tiles = self.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs) tiles = self.mask_tiles_to_bbox_and_time(min_lat, max_lat, min_lon, max_lon, time, time, tiles) if 'min_elevation' in kwargs or 'max_elevation' in kwargs: tiles = self.mask_tiles_to_elevation( kwargs.get('min_elevation'), kwargs.get('max_elevation'), tiles ) return tiles @catch_not_implemented def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): return NexusTileService.get_stats_within_box_at_time( min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs ) def get_bounding_box(self, tile_ids, ds=None): """ Retrieve a bounding box that encompasses all of the tiles represented by the given tile ids. :param tile_ids: List of tile ids :return: shapely.geometry.Polygon that represents the smallest bounding box that encompasses all of the tiles """ return NexusTileService._get_backend(ds).get_bounding_box(tile_ids) def get_min_time(self, tile_ids, ds=None): """ Get the minimum tile date from the list of tile ids :param tile_ids: List of tile ids :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) :return: long time in seconds since epoch """ return NexusTileService._get_backend(ds).get_min_time(tile_ids, ds) def get_max_time(self, tile_ids, ds=None): """ Get the maximum tile date from the list of tile ids :param tile_ids: List of tile ids :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) :return: long time in seconds since epoch """ return int(NexusTileService._get_backend(ds).get_max_time(tile_ids)) def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time): """ Get a list of distinct tile bounding boxes from all tiles within the given polygon and time range. :param bounding_polygon: The bounding polygon of tiles to search for :param ds: The dataset name to search :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon """ bounds = self._metadatastore.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time) return [box(*b) for b in bounds] def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, metadata=None, **kwargs): """ Return number of tiles that match search criteria. :param ds: The dataset name to search :param bounding_polygon: The polygon to search for tiles :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :param metadata: List of metadata values to search for tiles e.g ["river_id_i:1", "granule_s:granule_name"] :return: number of tiles that match search criteria """ return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs) def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): for tile in tiles: tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat) tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon) # Or together the masks of the individual arrays to create the new mask data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] # If this is multi-var, need to mask each variable separately. if tile.is_multi: # Combine space/time mask with existing mask on data data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask]) num_vars = len(tile.data) multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0) tile.data = ma.masked_where(multi_data_mask, tile.data) else: tile.data = ma.masked_where(data_mask, tile.data) tiles[:] = [tile for tile in tiles if not tile.data.mask.all()] return tiles def mask_tiles_to_bbox_and_time(self, min_lat, max_lat, min_lon, max_lon, start_time, end_time, tiles): for tile in tiles: tile.times = ma.masked_outside(tile.times, start_time, end_time) tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat) tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon) # Or together the masks of the individual arrays to create the new mask data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] tile.data = ma.masked_where(data_mask, tile.data) tiles[:] = [tile for tile in tiles if not tile.data.mask.all()] return tiles def mask_tiles_to_polygon(self, bounding_polygon, tiles): min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds return self.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, tiles) def mask_tiles_to_polygon_and_time(self, bounding_polygon, start_time, end_time, tiles): min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds return self.mask_tiles_to_bbox_and_time(min_lat, max_lat, min_lon, max_lon, start_time, end_time, tiles) def mask_tiles_to_time_range(self, start_time, end_time, tiles): """ Masks data in tiles to specified time range. :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :param tiles: List of tiles :return: A list tiles with data masked to specified time range """ if 0 <= start_time <= end_time: for tile in tiles: tile.times = ma.masked_outside(tile.times, start_time, end_time) # Or together the masks of the individual arrays to create the new mask data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] # If this is multi-var, need to mask each variable separately. if tile.is_multi: # Combine space/time mask with existing mask on data data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask]) num_vars = len(tile.data) multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0) tile.data = ma.masked_where(multi_data_mask, tile.data) else: tile.data = ma.masked_where(data_mask, tile.data) tiles[:] = [tile for tile in tiles if not tile.data.mask.all()] return tiles def mask_tiles_to_elevation(self, min_e, max_e, tiles): """ Masks data in tiles to specified time range. :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :param tiles: List of tiles :return: A list tiles with data masked to specified time range """ if min_e is None: min_e = -float('inf') if max_e is None: max_e = float('inf') for tile in tiles: if tile.elevation is None: continue tile.elevation = ma.masked_outside(tile.elevation, min_e, max_e) data_mask = ma.getmaskarray(tile.elevation) # If this is multi-var, need to mask each variable separately. if tile.is_multi: # Combine space/time mask with existing mask on data data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask]) num_vars = len(tile.data) multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0) tile.data = ma.masked_where(multi_data_mask, tile.data) else: tile.data = ma.masked_where(data_mask, tile.data) tiles[:] = [tile for tile in tiles if not tile.data.mask.all()] return tiles def mask_tiles_to_elevation(self, min_e, max_e, tiles): """ Masks data in tiles to specified time range. :param start_time: The start time to search for tiles :param end_time: The end time to search for tiles :param tiles: List of tiles :return: A list tiles with data masked to specified time range """ if min_e is None: min_e = -float('inf') if max_e is None: max_e = float('inf') for tile in tiles: if tile.elevation is None: continue tile.elevation = ma.masked_outside(tile.elevation, min_e, max_e) data_mask = ma.getmaskarray(tile.elevation) # If this is multi-var, need to mask each variable separately. if tile.is_multi: # Combine space/time mask with existing mask on data data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask]) num_vars = len(tile.data) multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0) multi_data_mask = np.broadcast_arrays(multi_data_mask, tile.data)[0] tile.data = ma.masked_where(multi_data_mask, tile.data) else: data_mask = np.broadcast_arrays(data_mask, tile.data)[0] tile.data = ma.masked_where(data_mask, tile.data) tiles[:] = [tile for tile in tiles if not tile.data.mask.all()] return tiles def fetch_data_for_tiles(self, *tiles): dataset = tiles[0].dataset return NexusTileService._get_backend(dataset).fetch_data_for_tiles(*tiles) def _metadata_store_docs_to_tiles(self, *store_docs): tiles = [] for store_doc in store_docs: tile = Tile() try: tile.tile_id = store_doc['id'] except KeyError: pass try: min_lat = store_doc['tile_min_lat'] min_lon = store_doc['tile_min_lon'] max_lat = store_doc['tile_max_lat'] max_lon = store_doc['tile_max_lon'] if isinstance(min_lat, list): min_lat = min_lat[0] if isinstance(min_lon, list): min_lon = min_lon[0] if isinstance(max_lat, list): max_lat = max_lat[0] if isinstance(max_lon, list): max_lon = max_lon[0] tile.bbox = BBox(min_lat, max_lat, min_lon, max_lon) except KeyError: pass try: tile.dataset = store_doc['dataset_s'] except KeyError: pass try: tile.dataset_id = store_doc['dataset_id_s'] except KeyError: pass try: tile.granule = store_doc['granule_s'] except KeyError: pass try: tile.min_time = datetime.strptime(store_doc['tile_min_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace( tzinfo=UTC) except KeyError: pass try: tile.max_time = datetime.strptime(store_doc['tile_max_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace( tzinfo=UTC) except KeyError: pass try: tile.section_spec = store_doc['sectionSpec_s'] except KeyError: pass try: tile.tile_stats = TileStats( store_doc['tile_min_val_d'], store_doc['tile_max_val_d'], store_doc['tile_avg_val_d'], store_doc['tile_count_i'] ) except KeyError: pass try: # Ensure backwards compatibility by working with old # tile_var_name_s and tile_standard_name_s fields to # will be overwritten if tile_var_name_ss is present # as well. if '[' in store_doc['tile_var_name_s']: var_names = json.loads(store_doc['tile_var_name_s']) else: var_names = [store_doc['tile_var_name_s']] standard_name = store_doc.get( 'tile_standard_name_s', json.dumps([None] * len(var_names)) ) if '[' in standard_name: standard_names = json.loads(standard_name) else: standard_names = [standard_name] tile.variables = [] for var_name, standard_name in zip(var_names, standard_names): tile.variables.append(TileVariable( variable_name=var_name, standard_name=standard_name )) except KeyError: pass if 'tile_var_name_ss' in store_doc: tile.variables = [] for var_name in store_doc['tile_var_name_ss']: standard_name_key = f'{var_name}.tile_standard_name_s' standard_name = store_doc.get(standard_name_key) tile.variables.append(TileVariable( variable_name=var_name, standard_name=standard_name )) tiles.append(tile) return tiles @staticmethod def _get_config_files(filename): log = logging.getLogger(__name__) candidates = [] extensions = ['.default', ''] for extension in extensions: try: candidate = pkg_resources.resource_filename(__name__, filename + extension) log.info('use config file {}'.format(filename + extension)) candidates.append(candidate) except KeyError as ke: log.warning('configuration file {} not found'.format(filename + extension)) return candidates