in analysis/webservice/algorithms_spark/Matchup.py [0:0]
def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_names, parameter, depth_min, depth_max,
time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, prioritize_distance=True, sc=None):
from functools import partial
with DRIVER_LOCK:
# Broadcast parameters
primary_b = sc.broadcast(primary_ds_name)
secondary_b = sc.broadcast(secondary_ds_names)
depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
tt_b = sc.broadcast(time_tolerance)
rt_b = sc.broadcast(float(radius_tolerance))
platforms_b = sc.broadcast(platforms)
bounding_wkt_b = sc.broadcast(bounding_wkt)
parameter_b = sc.broadcast(parameter)
# Parallelize list of tile ids
rdd = sc.parallelize(tile_ids, determine_parallelism(len(tile_ids)))
# Map Partitions ( list(tile_id) )
rdd_filtered = rdd.mapPartitions(
partial(
match_satellite_to_insitu,
primary_b=primary_b,
secondary_b=secondary_b,
parameter_b=parameter_b,
tt_b=tt_b,
rt_b=rt_b,
platforms_b=platforms_b,
bounding_wkt_b=bounding_wkt_b,
depth_min_b=depth_min_b,
depth_max_b=depth_max_b,
tile_service_factory=tile_service_factory
),
preservesPartitioning=True
).filter(
lambda p_m_tuple: abs(
iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)
) <= time_tolerance
)
if match_once:
# Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates
# the distance and finds the minimum
# Method used for calculating the distance between 2 DomsPoints
from pyproj import Geod
def dist(primary, matchup, prioritize_distance):
wgs84_geod = Geod(ellps='WGS84')
lat1, lon1 = (primary.latitude, primary.longitude)
lat2, lon2 = (matchup.latitude, matchup.longitude)
az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
if prioritize_distance:
return distance, time_dist(primary, matchup)
return time_dist(primary, matchup), distance
def time_dist(primary, matchup):
primary_time = iso_time_to_epoch(primary.time)
matchup_time = iso_time_to_epoch(matchup.time)
return abs(primary_time - matchup_time)
def filter_closest(matches):
"""
Filter given matches. Find the closest match to the primary
point and only keep other matches that match the same
time/space as that point.
:param matches: List of match tuples. Each tuple has the following format:
1. The secondary match
2. Tuple of form (space_dist, time_dist)
"""
closest_point = min(matches, key=lambda match: match[1])[0]
matches = list(filter(
lambda match: match.latitude == closest_point.latitude and
match.longitude == closest_point.longitude and
match.time == closest_point.time, map(
lambda match: match[0], matches
)
))
return matches
rdd_filtered = rdd_filtered.map(
lambda primary_matchup: tuple(
[primary_matchup[0], tuple([primary_matchup[1], dist(
primary_matchup[0],
primary_matchup[1],
prioritize_distance
)])]
)).combineByKey(
lambda value: [value],
lambda value_list, value: value_list + [value],
lambda value_list_a, value_list_b: value_list_a + value_list_b
).mapValues(lambda matches: filter_closest(matches))
else:
rdd_filtered = rdd_filtered \
.combineByKey(lambda value: [value], # Create 1 element list
lambda value_list, value: value_list + [value], # Add 1 element to list
lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together
result_as_map = rdd_filtered.collectAsMap()
return result_as_map