in analysis/webservice/algorithms_spark/Matchup.py [0:0]
def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_b, rt_b, platforms_b,
bounding_wkt_b, depth_min_b, depth_max_b, tile_service_factory):
the_time = datetime.now()
tile_ids = list(tile_ids)
if len(tile_ids) == 0:
return []
tile_service = tile_service_factory()
# Determine the spatial temporal extents of this partition of tiles
tiles_bbox = tile_service.get_bounding_box(tile_ids, ds=primary_b.value)
tiles_min_time = tile_service.get_min_time(tile_ids, ds=primary_b.value)
tiles_max_time = tile_service.get_max_time(tile_ids, ds=primary_b.value)
# Increase spatial extents by the radius tolerance
matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
-1 * rt_b.value)
matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value)
# Don't go outside of the search domain
search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds
matchup_min_lon = max(matchup_min_lon, search_min_x)
matchup_min_lat = max(matchup_min_lat, search_min_y)
matchup_max_lon = min(matchup_max_lon, search_max_x)
matchup_max_lat = min(matchup_max_lat, search_max_y)
# Find the centroid of the matchup bounding box and initialize the projections
matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0]
aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1])
# Increase temporal extents by the time tolerance
matchup_min_time = tiles_min_time - tt_b.value
matchup_max_time = tiles_max_time + tt_b.value
print("%s Time to determine spatial-temporal extents for partition %s to %s" % (
str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
# Query edge for all points within the spatial-temporal extents of this partition
is_insitu_dataset = edge_endpoints.get_provider_name(secondary_b.value) is not None
if is_insitu_dataset:
the_time = datetime.now()
edge_session = requests.Session()
edge_results = []
with edge_session:
for insitudata_name in secondary_b.value.split(','):
bbox = ','.join(
[str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
if edge_response['total'] == 0:
continue
r = edge_response['results']
for p in r:
p['source'] = insitudata_name
edge_results.extend(r)
print("%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
if len(edge_results) == 0:
return []
# Convert edge points to utm
the_time = datetime.now()
matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
for n, edge_point in enumerate(edge_results):
x, y = edge_point['longitude'], edge_point['latitude']
matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
else:
# Query nexus (cassandra? solr?) to find matching points.
bbox = ','.join(
[str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon),
str(matchup_max_lat)])
west, south, east, north = [float(b) for b in bbox.split(",")]
polygon = Polygon(
[(west, south), (east, south), (east, north), (west, north), (west, south)])
# Find tile IDS from spatial/temporal bounds of partition
matchup_tiles = tile_service.find_tiles_in_polygon(
bounding_polygon=polygon,
ds=secondary_b.value,
start_time=matchup_min_time,
end_time=matchup_max_time,
fl='id',
fetch_data=False,
sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat asc'],
rows=5000
)
# Convert Tile IDS to tiles and convert to UTM lat/lon projection.
matchup_points = []
edge_results = []
for tile in matchup_tiles:
# Retrieve tile data and convert to lat/lon projection
tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True, ds=secondary_b.value)
tile = tiles[0]
valid_indices = tile.get_indices()
primary_points = np.array([aeqd_proj(
tile.longitudes[aslice[2]],
tile.latitudes[aslice[1]]
) for aslice in valid_indices])
matchup_points.extend(primary_points)
edge_results.extend(tile_to_edge_points(tile))
if len(matchup_points) <= 0:
return []
matchup_points = np.array(matchup_points)
print("%s Time to convert match points for partition %s to %s" % (
str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
# Build kdtree from matchup points
the_time = datetime.now()
m_tree = spatial.cKDTree(matchup_points, leafsize=30)
print("%s Time to build matchup tree" % (str(datetime.now() - the_time)))
# The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
parameter_b.value, rt_b.value, aeqd_proj, primary_b.value)
for tile_id in tile_ids]
return chain(*match_generators)