analysis/webservice/algorithms_spark/MatchupDoms.py (633 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import threading from shapely.geometry import Polygon from datetime import datetime from itertools import chain from math import cos, radians from dataclasses import dataclass import numpy as np import pyproj import requests from pytz import timezone, UTC from scipy import spatial from shapely import wkt from shapely.geometry import Point from shapely.geometry import box from shapely.geos import WKTReadingError from webservice.NexusHandler import nexus_handler from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.algorithms.doms import config as edge_endpoints from webservice.algorithms.doms import values as doms_values from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults from webservice.algorithms.doms.ResultsStorage import ResultsStorage from webservice.webmodel import NexusProcessingException EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' def iso_time_to_epoch(str_time): return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace( tzinfo=UTC) - EPOCH).total_seconds() @nexus_handler class MatchupDoms(NexusCalcSparkHandler): name = "MatchupDoms" path = "/match_spark_doms" description = "Match measurements between two or more datasets" params = { "primary": { "name": "Primary Dataset", "type": "string", "description": "The Primary dataset used to find matches for. Required" }, "secondary": { "name": "Match-Up Datasets", "type": "comma-delimited string", "description": "The Dataset(s) being searched for measurements that match the Primary. Required" }, "parameter": { "name": "Match-Up Parameter", "type": "string", "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Optional" }, "startTime": { "name": "Start Time", "type": "string", "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" }, "endTime": { "name": "End Time", "type": "string", "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" }, "b": { "name": "Bounding box", "type": "comma-delimited float", "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" }, "depthMin": { "name": "Minimum Depth", "type": "float", "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit" }, "depthMax": { "name": "Maximum Depth", "type": "float", "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit" }, "tt": { "name": "Time Tolerance", "type": "long", "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400" }, "rt": { "name": "Radius Tolerance", "type": "float", "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000" }, "platforms": { "name": "Platforms", "type": "comma-delimited integer", "description": "Platforms to include for matchup consideration. Required" }, "matchOnce": { "name": "Match Once", "type": "boolean", "description": "Optional True/False flag used to determine if more than one match per primary point is returned. " + "If true, only the nearest point will be returned for each primary point. " + "If false, all points within the tolerances will be returned for each primary point. Default: False" }, "resultSizeLimit": { "name": "Result Size Limit", "type": "int", "description": "Optional integer value that limits the number of results returned from the matchup. " "If the number of primary matches is greater than this limit, the service will respond with " "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. " "Default: 500" } } singleton = True def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None): NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory) self.log = logging.getLogger(__name__) self.tile_service_factory = tile_service_factory self.config = config def parse_arguments(self, request): # Parse input arguments self.log.debug("Parsing arguments") try: bounding_polygon = request.get_bounding_polygon() except: raise NexusProcessingException( reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", code=400) primary_ds_name = request.get_argument('primary', None) if primary_ds_name is None: raise NexusProcessingException(reason="'primary' argument is required", code=400) secondary_ds_names = request.get_argument('secondary', None) if secondary_ds_names is None: raise NexusProcessingException(reason="'secondary' argument is required", code=400) parameter_s = request.get_argument('parameter') if parameter_s and parameter_s not in ['sst', 'sss', 'wind']: raise NexusProcessingException( reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) try: start_time = request.get_start_datetime() except: raise NexusProcessingException( reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", code=400) try: end_time = request.get_end_datetime() except: raise NexusProcessingException( reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", code=400) if start_time > end_time: raise NexusProcessingException( reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), code=400) depth_min = request.get_decimal_arg('depthMin', default=None) depth_max = request.get_decimal_arg('depthMax', default=None) if depth_min is not None and depth_max is not None and depth_min >= depth_max: raise NexusProcessingException( reason="Depth Min should be less than Depth Max", code=400) time_tolerance = request.get_int_arg('tt', default=86400) radius_tolerance = request.get_decimal_arg('rt', default=1000.0) platforms = request.get_argument('platforms', None) if platforms is None: raise NexusProcessingException(reason="'platforms' argument is required", code=400) try: p_validation = platforms.split(',') p_validation = [int(p) for p in p_validation] del p_validation except: raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) match_once = request.get_boolean_arg("matchOnce", default=False) result_size_limit = request.get_int_arg("resultSizeLimit", default=500) start_seconds_from_epoch = int((start_time - EPOCH).total_seconds()) end_seconds_from_epoch = int((end_time - EPOCH).total_seconds()) return bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ depth_min, depth_max, time_tolerance, radius_tolerance, \ platforms, match_once, result_size_limit def calc(self, request, **args): start = datetime.utcnow() # TODO Assuming Satellite primary bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ depth_min, depth_max, time_tolerance, radius_tolerance, \ platforms, match_once, result_size_limit = self.parse_arguments(request) with ResultsStorage(self.config) as resultsStorage: execution_id = str(resultsStorage.insertExecution(None, start, None, None)) self.log.debug("Querying for tiles in search domain") # Get tile ids in box tile_ids = [tile.tile_id for tile in self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name, start_seconds_from_epoch, end_seconds_from_epoch, fetch_data=False, fl='id', sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat asc'], rows=5000)] self.log.info('Found %s tile_ids', len(tile_ids)) # Call spark_matchup self.log.debug("Calling Spark Driver") try: spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name, secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance, radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc) except Exception as e: self.log.exception(e) raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500) end = datetime.utcnow() self.log.debug("Building and saving results") args = { "primary": primary_ds_name, "matchup": secondary_ds_names, "startTime": start_time, "endTime": end_time, "bbox": request.get_argument('b'), "timeTolerance": time_tolerance, "radiusTolerance": float(radius_tolerance), "platforms": platforms, "parameter": parameter_s } if depth_min is not None: args["depthMin"] = float(depth_min) if depth_max is not None: args["depthMax"] = float(depth_max) total_keys = len(list(spark_result.keys())) total_values = sum(len(v) for v in spark_result.values()) details = { "timeToComplete": int((end - start).total_seconds()), "numSecondaryMatched": total_values, "numPrimaryMatched": total_keys } matches = MatchupDoms.convert_to_matches(spark_result) def do_result_insert(): with ResultsStorage(self.config) as storage: storage.insertResults(results=matches, params=args, stats=details, startTime=start, completeTime=end, userEmail="", execution_id=execution_id) threading.Thread(target=do_result_insert).start() if 0 < result_size_limit < len(matches): result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None, computeOptions=None, executionId=execution_id, status_code=202) else: result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None, computeOptions=None, executionId=execution_id) return result @classmethod def convert_to_matches(cls, spark_result): matches = [] for primary_domspoint, matched_domspoints in spark_result.items(): p_matched = [cls.domspoint_to_dict(p_match, 'secondary') for p_match in matched_domspoints] primary = cls.domspoint_to_dict(primary_domspoint, 'primary') primary['matches'] = list(p_matched) matches.append(primary) return matches @staticmethod def domspoint_to_dict(domspoint, data_key_name='data'): doms_dict = { "platform": doms_values.getPlatformById(domspoint.platform), "device": doms_values.getDeviceById(domspoint.device), "lon": str(domspoint.longitude), "lat": str(domspoint.latitude), "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude), "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC), "fileurl": domspoint.file_url, "id": domspoint.data_id, "source": domspoint.source, data_key_name: [data_point.__dict__ for data_point in domspoint.data] } return doms_dict @dataclass class DataPoint: """ Represents a single point of data. This is used to construct the output of the matchup algorithm. :attribute variable_name: The name of the NetCDF variable. :attribute cf_variable_name: The CF standard_name of the NetCDF variable. This will be None if the standard_name does not exist in the source data file. :attribute variable_value: value at some point for the given variable. """ variable_name: str = None cf_variable_name: str = None variable_value: float = None class DomsPoint(object): def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None): self.time = time self.longitude = longitude self.latitude = latitude self.depth = depth self.data_id = data_id self.data = None self.source = None self.depth = None self.platform = None self.device = None self.file_url = None def __repr__(self): return str(self.__dict__) @staticmethod def _variables_to_device(variables): """ Given a list of science variables, attempt to determine what the correct device is. This method will only be used for satellite measurements, so the only options are 'scatterometers' or 'radiometers' :param variables: List of variable names :return: device id integer """ for variable in variables: if 'wind' in variable.variable_name.lower(): # scatterometers return 6 # Assume radiometers return 5 @staticmethod def from_nexus_point(nexus_point, tile=None): point = DomsPoint() point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index) if tile.is_multi: data_vals = nexus_point.data_vals else: data_vals = [nexus_point.data_vals] data = [] for data_val, variable in zip(data_vals, tile.variables): if data_val: data.append(DataPoint( variable_name=variable.variable_name, variable_value=data_val, cf_variable_name=variable.standard_name )) point.data = data try: point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item() except (KeyError, IndexError): pass try: point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item() except (KeyError, IndexError): pass try: point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item() except (KeyError, IndexError): pass point.longitude = nexus_point.longitude.item() point.latitude = nexus_point.latitude.item() point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ') try: point.depth = nexus_point.depth except KeyError: # No depth associated with this measurement pass point.sst_depth = 0 point.source = tile.dataset point.file_url = tile.granule point.platform = 9 point.device = DomsPoint._variables_to_device(tile.variables) return point @staticmethod def from_edge_point(edge_point): point = DomsPoint() try: x, y = wkt.loads(edge_point['point']).coords[0] except WKTReadingError: try: x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0] except ValueError: y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0] point.longitude = x point.latitude = y point.time = edge_point['time'] point.source = edge_point.get('source') point.platform = edge_point.get('platform') point.device = edge_point.get('device') point.file_url = edge_point.get('fileurl') data_fields = [ 'eastward_wind', 'northward_wind', 'wind_direction', 'wind_speed', 'sea_water_temperature', 'sea_water_temperature_depth', 'sea_water_salinity', 'sea_water_salinity_depth', ] data = [] # This is for in-situ secondary points for name in data_fields: val = edge_point.get(name) if val: data.append(DataPoint( variable_name=name, variable_value=val )) # This is for satellite secondary points if 'variables' in edge_point: data.extend([DataPoint( variable_name=variable.variable_name, variable_value=var_value, cf_variable_name=variable.standard_name ) for var_value, variable in zip( edge_point['var_values'], edge_point['variables'] ) if var_value]) point.data = data try: point.data_id = str(edge_point['id']) except KeyError: point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude) return point from threading import Lock DRIVER_LOCK = Lock() def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_names, parameter, depth_min, depth_max, time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, sc=None): from functools import partial with DRIVER_LOCK: # Broadcast parameters primary_b = sc.broadcast(primary_ds_name) secondary_b = sc.broadcast(secondary_ds_names) depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None) depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None) tt_b = sc.broadcast(time_tolerance) rt_b = sc.broadcast(float(radius_tolerance)) platforms_b = sc.broadcast(platforms) bounding_wkt_b = sc.broadcast(bounding_wkt) parameter_b = sc.broadcast(parameter) # Parallelize list of tile ids rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids))) # Map Partitions ( list(tile_id) ) rdd_filtered = rdd.mapPartitions( partial( match_satellite_to_insitu, primary_b=primary_b, secondary_b=secondary_b, parameter_b=parameter_b, tt_b=tt_b, rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b, depth_max_b=depth_max_b, tile_service_factory=tile_service_factory ), preservesPartitioning=True ).filter( lambda p_m_tuple: abs( iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time) ) <= time_tolerance ) if match_once: # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates # the distance and finds the minimum # Method used for calculating the distance between 2 DomsPoints from pyproj import Geod def dist(primary, matchup): wgs84_geod = Geod(ellps='WGS84') lat1, lon1 = (primary.latitude, primary.longitude) lat2, lon2 = (matchup.latitude, matchup.longitude) az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2) return distance rdd_filtered = rdd_filtered \ .map(lambda primary_matchup: tuple([primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])])) \ .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \ .mapValues(lambda x: [x[0]]) else: rdd_filtered = rdd_filtered \ .combineByKey(lambda value: [value], # Create 1 element list lambda value_list, value: value_list + [value], # Add 1 element to list lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together result_as_map = rdd_filtered.collectAsMap() return result_as_map def determine_parllelism(num_tiles): """ Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions. Also, don't go below the default of 8 """ num_partitions = max(min(num_tiles / 140, 128), 8) return num_partitions def add_meters_to_lon_lat(lon, lat, meters): """ Uses a simple approximation of 1 degree latitude = 111,111 meters 1 degree longitude = 111,111 meters * cosine(latitude) :param lon: longitude to add meters to :param lat: latitude to add meters to :param meters: meters to add to the longitude and latitude values :return: (longitude, latitude) increased by given meters """ longitude = lon + ((meters / 111111) * cos(radians(lat))) latitude = lat + (meters / 111111) return longitude, latitude def tile_to_edge_points(tile): indices = tile.get_indices() edge_points = [] for idx in indices: if tile.is_multi: data = [var_data[tuple(idx)] for var_data in tile.data] else: data = [tile.data[tuple(idx)]] edge_point = { 'point': f'Point({tile.longitudes[idx[2]]} {tile.latitudes[idx[1]]})', 'time': datetime.utcfromtimestamp(tile.times[idx[0]]).strftime('%Y-%m-%dT%H:%M:%SZ'), 'source': tile.dataset, 'platform': None, 'device': None, 'fileurl': tile.granule, 'variables': tile.variables, 'var_values': data } edge_points.append(edge_point) return edge_points def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_b, rt_b, platforms_b, bounding_wkt_b, depth_min_b, depth_max_b, tile_service_factory): the_time = datetime.now() tile_ids = list(tile_ids) if len(tile_ids) == 0: return [] tile_service = tile_service_factory() # Determine the spatial temporal extents of this partition of tiles tiles_bbox = tile_service.get_bounding_box(tile_ids) tiles_min_time = tile_service.get_min_time(tile_ids) tiles_max_time = tile_service.get_max_time(tile_ids) # Increase spatial extents by the radius tolerance matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1], -1 * rt_b.value) matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value) # Don't go outside of the search domain search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds matchup_min_lon = max(matchup_min_lon, search_min_x) matchup_min_lat = max(matchup_min_lat, search_min_y) matchup_max_lon = min(matchup_max_lon, search_max_x) matchup_max_lat = min(matchup_max_lat, search_max_y) # Find the centroid of the matchup bounding box and initialize the projections matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0] aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1]) # Increase temporal extents by the time tolerance matchup_min_time = tiles_min_time - tt_b.value matchup_max_time = tiles_max_time + tt_b.value print("%s Time to determine spatial-temporal extents for partition %s to %s" % ( str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])) # Query edge for all points within the spatial-temporal extents of this partition is_insitu_dataset = edge_endpoints.getEndpointByName(secondary_b.value) if is_insitu_dataset: the_time = datetime.now() edge_session = requests.Session() edge_results = [] with edge_session: for insitudata_name in secondary_b.value.split(','): bbox = ','.join( [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)]) edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox, platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session) if edge_response['totalResults'] == 0: continue r = edge_response['results'] for p in r: p['source'] = insitudata_name edge_results.extend(r) print("%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])) if len(edge_results) == 0: return [] # Convert edge points to utm the_time = datetime.now() matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32) for n, edge_point in enumerate(edge_results): try: x, y = wkt.loads(edge_point['point']).coords[0] except WKTReadingError: try: x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0] except ValueError: y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0] matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y) else: # Query nexus (cassandra? solr?) to find matching points. bbox = ','.join( [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)]) west, south, east, north = [float(b) for b in bbox.split(",")] polygon = Polygon( [(west, south), (east, south), (east, north), (west, north), (west, south)]) matchup_tiles = tile_service.find_tiles_in_polygon( bounding_polygon=polygon, ds=secondary_b.value, start_time=matchup_min_time, end_time=matchup_max_time, fetch_data=True, sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat asc'], rows=5000 ) # Convert Tile IDS to tiles and convert to UTM lat/lon projection. matchup_points = [] for tile in matchup_tiles: valid_indices = tile.get_indices() primary_points = np.array([aeqd_proj( tile.longitudes[aslice[2]], tile.latitudes[aslice[1]] ) for aslice in valid_indices]) matchup_points.extend(primary_points) # Convert tiles to 'edge points' which match the format of in-situ edge points. edge_results = [] for matchup_tile in matchup_tiles: edge_results.extend(tile_to_edge_points(matchup_tile)) matchup_points = np.array(matchup_points) print("%s Time to convert match points for partition %s to %s" % ( str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])) # Build kdtree from matchup points the_time = datetime.now() m_tree = spatial.cKDTree(matchup_points, leafsize=30) print("%s Time to build matchup tree" % (str(datetime.now() - the_time))) # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value, parameter_b.value, rt_b.value, aeqd_proj) for tile_id in tile_ids] return chain(*match_generators) def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt, search_parameter, radius_tolerance, aeqd_proj): from nexustiles.model.nexusmodel import NexusPoint from webservice.algorithms_spark.MatchupDoms import DomsPoint # Must import DomsPoint or Spark complains # Load tile try: the_time = datetime.now() tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt), tile_service.find_tile_by_id(tile_id))[0] print("%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id)) except IndexError: # This should only happen if all measurements in a tile become masked after applying the bounding polygon print('Tile is empty after masking spatially. Skipping this tile.') return # Convert valid tile lat,lon tuples to UTM tuples the_time = datetime.now() # Get list of indices of valid values valid_indices = tile.get_indices() primary_points = np.array( [aeqd_proj(tile.longitudes[aslice[2]], tile.latitudes[aslice[1]]) for aslice in valid_indices]) print("%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id)) a_time = datetime.now() p_tree = spatial.cKDTree(primary_points, leafsize=30) print("%s Time to build primary tree" % (str(datetime.now() - a_time))) a_time = datetime.now() matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance) print("%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id)) for i, point_matches in enumerate(matched_indexes): if len(point_matches) > 0: if tile.is_multi: data_vals = [tile_data[tuple(valid_indices[i])] for tile_data in tile.data] else: data_vals = tile.data[tuple(valid_indices[i])] p_nexus_point = NexusPoint( latitude=tile.latitudes[valid_indices[i][1]], longitude=tile.longitudes[valid_indices[i][2]], depth=None, time=tile.times[valid_indices[i][0]], index=valid_indices[i], data_vals=data_vals ) p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile) for m_point_index in point_matches: m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index]) yield p_doms_point, m_doms_point def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000, startIndex=0, stats=True, session=None): try: startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ') except TypeError: # Assume we were passed a properly formatted string pass try: endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ') except TypeError: # Assume we were passed a properly formatted string pass try: platform = platform.split(',') except AttributeError: # Assume we were passed a list pass params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "minDepth": depth_min, "maxDepth": depth_max, "platform": platform, "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()} if variable is not None: params["variable"] = variable dataset_url = edge_endpoints.getEndpointByName(dataset)['url'] if session is not None: edge_request = session.get(dataset_url, params=params) else: edge_request = requests.get(dataset_url, params=params) edge_request.raise_for_status() edge_response = json.loads(edge_request.text) # Get all edge results next_page_url = edge_response.get('next', None) while next_page_url is not None: if session is not None: edge_page_request = session.get(next_page_url) else: edge_page_request = requests.get(next_page_url) edge_page_request.raise_for_status() edge_page_response = json.loads(edge_page_request.text) edge_response['results'].extend(edge_page_response['results']) next_page_url = edge_page_response.get('next', None) return edge_response