analysis/webservice/algorithms_spark/Matchup.py (699 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
import logging
import threading
from shapely.geometry import Polygon
from datetime import datetime
from itertools import chain
from math import cos, radians
from dataclasses import dataclass
import numpy as np
import pyproj
import requests
from pytz import timezone, UTC
from scipy import spatial
from shapely import wkt
from shapely.geometry import box
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.algorithms.doms import config as edge_endpoints
from webservice.algorithms.doms import values as doms_values
from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
from webservice.algorithms.doms.ResultsStorage import ResultsStorage
from webservice.algorithms.doms.insitu import query_insitu as query_edge
from webservice.algorithms.doms.insitu import query_insitu_schema
from webservice.webmodel import NexusProcessingException
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
class Schema:
def __init__(self):
self.schema = None
def get(self):
if self.schema is None:
logging.info("No local schema; fetching")
self.schema = query_insitu_schema()
return self.schema
insitu_schema = Schema()
def iso_time_to_epoch(str_time):
return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=UTC) - EPOCH).total_seconds()
@nexus_handler
class Matchup(NexusCalcSparkHandler):
name = "Matchup"
path = "/match_spark"
description = "Match measurements between two or more datasets"
params = {
"primary": {
"name": "Primary Dataset",
"type": "string",
"description": "The Primary dataset used to find matches for. Required"
},
"secondary": {
"name": "Match-Up Datasets",
"type": "comma-delimited string",
"description": "The Dataset(s) being searched for measurements that match the Primary. Required"
},
"parameter": {
"name": "Match-Up Parameter",
"type": "string",
"description": "The parameter of interest used for the match up. Only used for satellite to insitu matchups. Optional"
},
"startTime": {
"name": "Start Time",
"type": "string",
"description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
},
"endTime": {
"name": "End Time",
"type": "string",
"description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
},
"b": {
"name": "Bounding box",
"type": "comma-delimited float",
"description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
"Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
},
"depthMin": {
"name": "Minimum Depth",
"type": "float",
"description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit"
},
"depthMax": {
"name": "Maximum Depth",
"type": "float",
"description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit"
},
"tt": {
"name": "Time Tolerance",
"type": "long",
"description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400"
},
"rt": {
"name": "Radius Tolerance",
"type": "float",
"description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000"
},
"platforms": {
"name": "Platforms",
"type": "comma-delimited integer",
"description": "Platforms to include for matchup consideration. Required"
},
"matchOnce": {
"name": "Match Once",
"type": "boolean",
"description": "Optional True/False flag used to determine if more than one match per primary point is returned. "
+ "If true, only the nearest point will be returned for each primary point. "
+ "If false, all points within the tolerances will be returned for each primary point. Default: False"
},
"resultSizeLimit": {
"name": "Result Size Limit",
"type": "int",
"description": "Optional integer value that limits the number of results returned from the matchup. "
"If the number of primary matches is greater than this limit, the service will respond with "
"(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
"Default: 500"
},
"prioritizeDistance": {
"name": "Prioritize distance",
"type": "boolean",
"description": "If true, prioritize distance over time when computing matches. If false, prioritize time over "
"distance. This is only relevant if matchOnce=true, because otherwise all matches will be "
"included so long as they fit within the user-provided tolerances. Default is true."
}
}
singleton = True
def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None):
NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory)
self.log = logging.getLogger(__name__)
self.tile_service_factory = tile_service_factory
self.config = config
def parse_arguments(self, request):
# Parse input arguments
self.log.debug("Parsing arguments")
try:
bounding_polygon = request.get_bounding_polygon()
except:
raise NexusProcessingException(
reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
code=400)
primary_ds_name = request.get_argument('primary', None)
if primary_ds_name is None:
raise NexusProcessingException(reason="'primary' argument is required", code=400)
secondary_ds_names = request.get_argument('secondary', None)
if secondary_ds_names is None:
raise NexusProcessingException(reason="'secondary' argument is required", code=400)
parameter_s = request.get_argument('parameter')
if parameter_s:
insitu_params = get_insitu_params(insitu_schema.get())
if parameter_s not in insitu_params:
raise NexusProcessingException(
reason=f"Parameter {parameter_s} not supported. Must be one of {insitu_params}", code=400)
try:
start_time = request.get_start_datetime()
except:
raise NexusProcessingException(
reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
try:
end_time = request.get_end_datetime()
except:
raise NexusProcessingException(
reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
if start_time > end_time:
raise NexusProcessingException(
reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
code=400)
depth_min = request.get_decimal_arg('depthMin', default=None)
depth_max = request.get_decimal_arg('depthMax', default=None)
if depth_min is not None and depth_max is not None and depth_min >= depth_max:
raise NexusProcessingException(
reason="Depth Min should be less than Depth Max", code=400)
time_tolerance = request.get_int_arg('tt', default=86400)
radius_tolerance = request.get_decimal_arg('rt', default=1000.0)
platforms = request.get_argument('platforms', None)
if platforms is None:
raise NexusProcessingException(reason="'platforms' argument is required", code=400)
match_once = request.get_boolean_arg("matchOnce", default=False)
result_size_limit = request.get_int_arg("resultSizeLimit", default=500)
start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
prioritize_distance = request.get_boolean_arg("prioritizeDistance", default=True)
return bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
depth_min, depth_max, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit, prioritize_distance
def calc(self, request, **args):
start = datetime.utcnow()
# TODO Assuming Satellite primary
bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
depth_min, depth_max, time_tolerance, radius_tolerance, \
platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request)
with ResultsStorage(self.config) as resultsStorage:
execution_id = str(resultsStorage.insertExecution(None, start, None, None))
self.log.debug("Querying for tiles in search domain")
# Get tile ids in box
tile_ids = [tile.tile_id for tile in
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
start_seconds_from_epoch, end_seconds_from_epoch,
fetch_data=False, fl='id',
sort=['tile_min_time_dt asc', 'tile_min_lon asc',
'tile_min_lat asc'], rows=5000)]
self.log.info('Found %s tile_ids', len(tile_ids))
# Call spark_matchup
self.log.debug("Calling Spark Driver")
try:
spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
radius_tolerance, platforms, match_once, self.tile_service_factory,
prioritize_distance, sc=self._sc)
except Exception as e:
self.log.exception(e)
raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
end = datetime.utcnow()
self.log.debug("Building and saving results")
args = {
"primary": primary_ds_name,
"matchup": secondary_ds_names,
"startTime": start_time,
"endTime": end_time,
"bbox": request.get_argument('b'),
"timeTolerance": time_tolerance,
"radiusTolerance": float(radius_tolerance),
"platforms": platforms,
"parameter": parameter_s
}
if depth_min is not None:
args["depthMin"] = float(depth_min)
if depth_max is not None:
args["depthMax"] = float(depth_max)
total_keys = len(list(spark_result.keys()))
total_values = sum(len(v) for v in spark_result.values())
details = {
"timeToComplete": int((end - start).total_seconds()),
"numSecondaryMatched": total_values,
"numPrimaryMatched": total_keys
}
matches = Matchup.convert_to_matches(spark_result)
def do_result_insert():
with ResultsStorage(self.config) as storage:
storage.insertResults(results=matches, params=args, stats=details,
startTime=start, completeTime=end, userEmail="",
execution_id=execution_id)
threading.Thread(target=do_result_insert).start()
# Get only the first "result_size_limit" results
# '0' means returns everything
if result_size_limit > 0:
return_matches = matches[0:result_size_limit]
else:
return_matches = matches
result = DomsQueryResults(results=return_matches, args=args,
details=details, bounds=None,
count=len(matches), computeOptions=None,
executionId=execution_id)
return result
@classmethod
def convert_to_matches(cls, spark_result):
matches = []
for primary_domspoint, matched_domspoints in spark_result.items():
p_matched = [cls.domspoint_to_dict(p_match, 'secondary') for p_match in matched_domspoints]
primary = cls.domspoint_to_dict(primary_domspoint, 'primary')
primary['matches'] = list(p_matched)
matches.append(primary)
return matches
@staticmethod
def domspoint_to_dict(domspoint, data_key_name='data'):
doms_dict = {
"platform": doms_values.getPlatformById(domspoint.platform),
"device": doms_values.getDeviceById(domspoint.device),
"lon": str(domspoint.longitude),
"lat": str(domspoint.latitude),
"point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude),
"time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC),
"depth": domspoint.depth,
"fileurl": domspoint.file_url,
"id": domspoint.data_id,
"source": domspoint.source,
data_key_name: [data_point.__dict__ for data_point in domspoint.data]
}
return doms_dict
@dataclass
class DataPoint:
"""
Represents a single point of data. This is used to construct the
output of the matchup algorithm.
:attribute variable_name: The name of the NetCDF variable.
:attribute cf_variable_name: The CF standard_name of the
NetCDF variable. This will be None if the standard_name does not
exist in the source data file.
:attribute variable_value: value at some point for the given
variable.
:attribute variable_unit: Unit of the measurement. Will be None if
no unit is known.
"""
variable_name: str = None
cf_variable_name: str = None
variable_value: float = None
variable_unit: Optional[str] = None
class DomsPoint(object):
def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None):
self.time = time
self.longitude = longitude
self.latitude = latitude
self.depth = depth
self.data_id = data_id
self.data = None
self.source = None
self.platform = None
self.device = None
self.file_url = None
self.__id = id(self)
def __repr__(self):
return str(self.__dict__)
def __eq__(self, other):
return isinstance(other, DomsPoint) and other.__id == self.__id
def __hash__(self):
return hash(self.data_id) if self.data_id else id(self)
@staticmethod
def _variables_to_device(variables):
"""
Given a list of science variables, attempt to determine what
the correct device is. This method will only be used for
satellite measurements, so the only options are 'scatterometers'
or 'radiometers'
:param variables: List of variable names
:return: device id integer
"""
for variable in variables:
if 'wind' in variable.variable_name.lower():
# scatterometers
return 6
# Assume radiometers
return 5
@staticmethod
def from_nexus_point(nexus_point, tile=None):
point = DomsPoint()
point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index)
if tile.is_multi:
data_vals = nexus_point.data_vals
else:
data_vals = [nexus_point.data_vals]
data = []
for data_val, variable in zip(data_vals, tile.variables):
if data_val:
data.append(DataPoint(
variable_name=variable.variable_name,
variable_value=data_val,
cf_variable_name=variable.standard_name,
variable_unit=None
))
point.data = data
try:
point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item()
except (KeyError, IndexError):
pass
try:
point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item()
except (KeyError, IndexError):
pass
try:
point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item()
except (KeyError, IndexError):
pass
point.longitude = nexus_point.longitude.item()
point.latitude = nexus_point.latitude.item()
point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ')
try:
point.depth = nexus_point.depth
except KeyError:
# No depth associated with this measurement
pass
point.sst_depth = 0
point.source = tile.dataset
point.file_url = tile.granule
point.platform = 9
point.device = DomsPoint._variables_to_device(tile.variables)
return point
@staticmethod
def from_edge_point(edge_point):
point = DomsPoint()
x, y = edge_point['longitude'], edge_point['latitude']
point.longitude = x
point.latitude = y
point.time = edge_point['time']
point.source = edge_point.get('source')
point.platform = edge_point.get('platform')
point.device = edge_point.get('device')
point.file_url = edge_point.get('fileurl')
point.depth = edge_point.get('depth')
def is_defined(key, d):
return key in d and d[key] is not None and d[key] != ''
if is_defined('id', point.platform):
point.platform = edge_point.get('platform')['id']
elif is_defined('code', point.platform):
point.platform = edge_point.get('platform')['code']
elif is_defined('type', point.platform):
point.platform = edge_point.get('platform')['type']
data_fields = [
'air_pressure',
'air_pressure_quality',
'air_temperature',
'air_temperature_quality',
'dew_point_temperature',
'dew_point_temperature_quality',
'downwelling_longwave_flux_in_air',
'downwelling_longwave_flux_in_air_quality',
'downwelling_longwave_radiance_in_air',
'downwelling_longwave_radiance_in_air_quality',
'downwelling_shortwave_flux_in_air',
'downwelling_shortwave_flux_in_air_quality',
'mass_concentration_of_chlorophyll_in_sea_water',
'mass_concentration_of_chlorophyll_in_sea_water_quality',
'rainfall_rate',
'rainfall_rate_quality',
'relative_humidity',
'relative_humidity_quality',
'sea_surface_salinity',
'sea_surface_salinity_quality',
'sea_surface_skin_temperature',
'sea_surface_skin_temperature_quality',
'sea_surface_subskin_temperature',
'sea_surface_subskin_temperature_quality',
'sea_surface_temperature',
'sea_surface_temperature_quality',
'sea_water_density',
'sea_water_density_quality',
'sea_water_electrical_conductivity',
'sea_water_electrical_conductivity_quality',
'sea_water_practical_salinity',
'sea_water_practical_salinity_quality',
'sea_water_salinity',
'sea_water_salinity_quality',
'sea_water_temperature',
'sea_water_temperature_quality',
'surface_downwelling_photosynthetic_photon_flux_in_air',
'surface_downwelling_photosynthetic_photon_flux_in_air_quality',
'wet_bulb_temperature',
'wet_bulb_temperature_quality',
'wind_speed',
'wind_speed_quality',
'wind_from_direction',
'wind_from_direction_quality',
'wind_to_direction',
'wind_to_direction_quality',
'eastward_wind',
'northward_wind',
'wind_component_quality'
]
data = []
# This is for in-situ secondary points
for name in data_fields:
val = edge_point.get(name)
if not val:
continue
unit = get_insitu_unit(name, insitu_schema.get())
data.append(DataPoint(
variable_name=name,
cf_variable_name=name,
variable_value=val,
variable_unit=unit
))
# This is for satellite secondary points
if 'variables' in edge_point:
data.extend([DataPoint(
variable_name=variable.variable_name,
variable_value=var_value,
cf_variable_name=variable.standard_name,
variable_unit=None
) for var_value, variable in zip(
edge_point['var_values'],
edge_point['variables']
) if var_value])
point.data = data
meta = edge_point.get('meta', None)
# Appending depth to data_id. Currently, our insitu data has the same id value for measurements taken at
# different depths. This causes secondary insitu matches to be incorrectly filtered out from NetCDF files.
if meta:
point.data_id = f'{meta}@{point.depth}'
else:
point.data_id = f'{point.time}:{point.longitude}:{point.latitude}@{point.depth}'
return point
from threading import Lock
DRIVER_LOCK = Lock()
def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_names, parameter, depth_min, depth_max,
time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, prioritize_distance=True, sc=None):
from functools import partial
with DRIVER_LOCK:
# Broadcast parameters
primary_b = sc.broadcast(primary_ds_name)
secondary_b = sc.broadcast(secondary_ds_names)
depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
tt_b = sc.broadcast(time_tolerance)
rt_b = sc.broadcast(float(radius_tolerance))
platforms_b = sc.broadcast(platforms)
bounding_wkt_b = sc.broadcast(bounding_wkt)
parameter_b = sc.broadcast(parameter)
# Parallelize list of tile ids
rdd = sc.parallelize(tile_ids, determine_parallelism(len(tile_ids)))
# Map Partitions ( list(tile_id) )
rdd_filtered = rdd.mapPartitions(
partial(
match_satellite_to_insitu,
primary_b=primary_b,
secondary_b=secondary_b,
parameter_b=parameter_b,
tt_b=tt_b,
rt_b=rt_b,
platforms_b=platforms_b,
bounding_wkt_b=bounding_wkt_b,
depth_min_b=depth_min_b,
depth_max_b=depth_max_b,
tile_service_factory=tile_service_factory
),
preservesPartitioning=True
).filter(
lambda p_m_tuple: abs(
iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)
) <= time_tolerance
)
if match_once:
# Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates
# the distance and finds the minimum
# Method used for calculating the distance between 2 DomsPoints
from pyproj import Geod
def dist(primary, matchup, prioritize_distance):
wgs84_geod = Geod(ellps='WGS84')
lat1, lon1 = (primary.latitude, primary.longitude)
lat2, lon2 = (matchup.latitude, matchup.longitude)
az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
if prioritize_distance:
return distance, time_dist(primary, matchup)
return time_dist(primary, matchup), distance
def time_dist(primary, matchup):
primary_time = iso_time_to_epoch(primary.time)
matchup_time = iso_time_to_epoch(matchup.time)
return abs(primary_time - matchup_time)
def filter_closest(matches):
"""
Filter given matches. Find the closest match to the primary
point and only keep other matches that match the same
time/space as that point.
:param matches: List of match tuples. Each tuple has the following format:
1. The secondary match
2. Tuple of form (space_dist, time_dist)
"""
closest_point = min(matches, key=lambda match: match[1])[0]
matches = list(filter(
lambda match: match.latitude == closest_point.latitude and
match.longitude == closest_point.longitude and
match.time == closest_point.time, map(
lambda match: match[0], matches
)
))
return matches
rdd_filtered = rdd_filtered.map(
lambda primary_matchup: tuple(
[primary_matchup[0], tuple([primary_matchup[1], dist(
primary_matchup[0],
primary_matchup[1],
prioritize_distance
)])]
)).combineByKey(
lambda value: [value],
lambda value_list, value: value_list + [value],
lambda value_list_a, value_list_b: value_list_a + value_list_b
).mapValues(lambda matches: filter_closest(matches))
else:
rdd_filtered = rdd_filtered \
.combineByKey(lambda value: [value], # Create 1 element list
lambda value_list, value: value_list + [value], # Add 1 element to list
lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together
result_as_map = rdd_filtered.collectAsMap()
return result_as_map
def determine_parallelism(num_tiles):
"""
Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions.
Also, don't go below the default of 8
"""
num_partitions = max(min(num_tiles / 140, 128), 8)
return num_partitions
def add_meters_to_lon_lat(lon, lat, meters):
"""
Uses a simple approximation of
1 degree latitude = 111,111 meters
1 degree longitude = 111,111 meters * cosine(latitude)
:param lon: longitude to add meters to
:param lat: latitude to add meters to
:param meters: meters to add to the longitude and latitude values
:return: (longitude, latitude) increased by given meters
"""
longitude = lon + ((meters / 111111) * cos(radians(lat)))
latitude = lat + (meters / 111111)
return longitude, latitude
def get_insitu_params(insitu_schema):
"""
Get all possible insitu params from the CDMS insitu schema
"""
params = insitu_schema.get(
'definitions', {}).get('observation', {}).get('properties', {})
# Filter params so only variables with units are considered
params = list(map(
lambda param: param[0], filter(lambda param: 'units'in param[1], params.items())))
return params
def get_insitu_unit(variable_name, insitu_schema):
"""
Retrieve the units from the insitu api schema endpoint for the given variable.
If no units are available for this variable, return "None"
"""
properties = insitu_schema.get('definitions', {}).get('observation', {}).get('properties', {})
for observation_name, observation_value in properties.items():
if observation_name == variable_name:
return observation_value.get('units')
def tile_to_edge_points(tile):
indices = tile.get_indices()
edge_points = []
for idx in indices:
if tile.is_multi:
data = [var_data[tuple(idx)] for var_data in tile.data]
else:
data = [tile.data[tuple(idx)]]
edge_point = {
'latitude': tile.latitudes[idx[1]],
'longitude': tile.longitudes[idx[2]],
'time': datetime.utcfromtimestamp(tile.times[idx[0]]).strftime('%Y-%m-%dT%H:%M:%SZ'),
'source': tile.dataset,
'platform': 'orbiting satellite',
'device': None,
'fileurl': tile.granule,
'variables': tile.variables,
'var_values': data
}
edge_points.append(edge_point)
return edge_points
def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_b, rt_b, platforms_b,
bounding_wkt_b, depth_min_b, depth_max_b, tile_service_factory):
the_time = datetime.now()
tile_ids = list(tile_ids)
if len(tile_ids) == 0:
return []
tile_service = tile_service_factory()
# Determine the spatial temporal extents of this partition of tiles
tiles_bbox = tile_service.get_bounding_box(tile_ids)
tiles_min_time = tile_service.get_min_time(tile_ids)
tiles_max_time = tile_service.get_max_time(tile_ids)
# Increase spatial extents by the radius tolerance
matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
-1 * rt_b.value)
matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value)
# Don't go outside of the search domain
search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds
matchup_min_lon = max(matchup_min_lon, search_min_x)
matchup_min_lat = max(matchup_min_lat, search_min_y)
matchup_max_lon = min(matchup_max_lon, search_max_x)
matchup_max_lat = min(matchup_max_lat, search_max_y)
# Find the centroid of the matchup bounding box and initialize the projections
matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0]
aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1])
# Increase temporal extents by the time tolerance
matchup_min_time = tiles_min_time - tt_b.value
matchup_max_time = tiles_max_time + tt_b.value
print("%s Time to determine spatial-temporal extents for partition %s to %s" % (
str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
# Query edge for all points within the spatial-temporal extents of this partition
is_insitu_dataset = edge_endpoints.get_provider_name(secondary_b.value) is not None
if is_insitu_dataset:
the_time = datetime.now()
edge_session = requests.Session()
edge_results = []
with edge_session:
for insitudata_name in secondary_b.value.split(','):
bbox = ','.join(
[str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
if edge_response['total'] == 0:
continue
r = edge_response['results']
for p in r:
p['source'] = insitudata_name
edge_results.extend(r)
print("%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
if len(edge_results) == 0:
return []
# Convert edge points to utm
the_time = datetime.now()
matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
for n, edge_point in enumerate(edge_results):
x, y = edge_point['longitude'], edge_point['latitude']
matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
else:
# Query nexus (cassandra? solr?) to find matching points.
bbox = ','.join(
[str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon),
str(matchup_max_lat)])
west, south, east, north = [float(b) for b in bbox.split(",")]
polygon = Polygon(
[(west, south), (east, south), (east, north), (west, north), (west, south)])
# Find tile IDS from spatial/temporal bounds of partition
matchup_tiles = tile_service.find_tiles_in_polygon(
bounding_polygon=polygon,
ds=secondary_b.value,
start_time=matchup_min_time,
end_time=matchup_max_time,
fl='id',
fetch_data=False,
sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat asc'],
rows=5000
)
# Convert Tile IDS to tiles and convert to UTM lat/lon projection.
matchup_points = []
edge_results = []
for tile in matchup_tiles:
# Retrieve tile data and convert to lat/lon projection
tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True)
tile = tiles[0]
valid_indices = tile.get_indices()
primary_points = np.array([aeqd_proj(
tile.longitudes[aslice[2]],
tile.latitudes[aslice[1]]
) for aslice in valid_indices])
matchup_points.extend(primary_points)
edge_results.extend(tile_to_edge_points(tile))
if len(matchup_points) <= 0:
return []
matchup_points = np.array(matchup_points)
print("%s Time to convert match points for partition %s to %s" % (
str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
# Build kdtree from matchup points
the_time = datetime.now()
m_tree = spatial.cKDTree(matchup_points, leafsize=30)
print("%s Time to build matchup tree" % (str(datetime.now() - the_time)))
# The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
parameter_b.value, rt_b.value, aeqd_proj) for tile_id
in tile_ids]
return chain(*match_generators)
def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
search_parameter, radius_tolerance, aeqd_proj):
from nexustiles.model.nexusmodel import NexusPoint
from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains
# Load tile
try:
the_time = datetime.now()
tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
tile_service.find_tile_by_id(tile_id))[0]
print("%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id))
except IndexError:
# This should only happen if all measurements in a tile become masked after applying the bounding polygon
print('Tile is empty after masking spatially. Skipping this tile.')
return
# Convert valid tile lat,lon tuples to UTM tuples
the_time = datetime.now()
# Get list of indices of valid values
valid_indices = tile.get_indices()
primary_points = np.array(
[aeqd_proj(tile.longitudes[aslice[2]], tile.latitudes[aslice[1]]) for
aslice in valid_indices])
print("%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id))
a_time = datetime.now()
p_tree = spatial.cKDTree(primary_points, leafsize=30)
print("%s Time to build primary tree" % (str(datetime.now() - a_time)))
a_time = datetime.now()
matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance)
print("%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id))
for i, point_matches in enumerate(matched_indexes):
if len(point_matches) > 0:
if tile.is_multi:
data_vals = [tile_data[tuple(valid_indices[i])] for tile_data in tile.data]
else:
data_vals = tile.data[tuple(valid_indices[i])]
p_nexus_point = NexusPoint(
latitude=tile.latitudes[valid_indices[i][1]],
longitude=tile.longitudes[valid_indices[i][2]],
depth=None,
time=tile.times[valid_indices[i][0]],
index=valid_indices[i],
data_vals=data_vals
)
p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile)
for m_point_index in point_matches:
m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index])
yield p_doms_point, m_doms_point