def spark_matchup_driver()

in analysis/webservice/algorithms_spark/Matchup.py [0:0]


def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_names, parameter, depth_min, depth_max,
                         time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, prioritize_distance=True, sc=None):
    from functools import partial

    with DRIVER_LOCK:
        # Broadcast parameters
        primary_b = sc.broadcast(primary_ds_name)
        secondary_b = sc.broadcast(secondary_ds_names)
        depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
        depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
        tt_b = sc.broadcast(time_tolerance)
        rt_b = sc.broadcast(float(radius_tolerance))
        platforms_b = sc.broadcast(platforms)
        bounding_wkt_b = sc.broadcast(bounding_wkt)
        parameter_b = sc.broadcast(parameter)

        # Parallelize list of tile ids
        rdd = sc.parallelize(tile_ids, determine_parallelism(len(tile_ids)))

    # Map Partitions ( list(tile_id) )
    rdd_filtered = rdd.mapPartitions(
        partial(
            match_satellite_to_insitu,
            primary_b=primary_b,
            secondary_b=secondary_b,
            parameter_b=parameter_b,
            tt_b=tt_b,
            rt_b=rt_b,
            platforms_b=platforms_b,
            bounding_wkt_b=bounding_wkt_b,
            depth_min_b=depth_min_b,
            depth_max_b=depth_max_b,
            tile_service_factory=tile_service_factory
        ),
        preservesPartitioning=True
    ).filter(
        lambda p_m_tuple: abs(
            iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)
        ) <= time_tolerance
    )

    if match_once:
        # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates
        # the distance and finds the minimum

        # Method used for calculating the distance between 2 DomsPoints
        from pyproj import Geod

        def dist(primary, matchup, prioritize_distance):
            wgs84_geod = Geod(ellps='WGS84')
            lat1, lon1 = (primary.latitude, primary.longitude)
            lat2, lon2 = (matchup.latitude, matchup.longitude)
            az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
            if prioritize_distance:
                return distance, time_dist(primary, matchup)
            return time_dist(primary, matchup), distance

        def time_dist(primary, matchup):
            primary_time = iso_time_to_epoch(primary.time)
            matchup_time = iso_time_to_epoch(matchup.time)
            return abs(primary_time - matchup_time)

        def filter_closest(matches):
            """
            Filter given matches. Find the closest match to the primary
            point and only keep other matches that match the same
            time/space as that point.

            :param matches: List of match tuples. Each tuple has the following format:
                1. The secondary match
                2. Tuple of form (space_dist, time_dist)
            """
            closest_point = min(matches, key=lambda match: match[1])[0]
            matches = list(filter(
                lambda match: match.latitude == closest_point.latitude and
                              match.longitude == closest_point.longitude and
                              match.time == closest_point.time, map(
                    lambda match: match[0], matches
                )
            ))
            return matches

        rdd_filtered = rdd_filtered.map(
            lambda primary_matchup: tuple(
                [primary_matchup[0], tuple([primary_matchup[1], dist(
                    primary_matchup[0],
                    primary_matchup[1],
                    prioritize_distance
                )])]
            )).combineByKey(
                lambda value: [value],
                lambda value_list, value: value_list + [value],
                lambda value_list_a, value_list_b: value_list_a + value_list_b
            ).mapValues(lambda matches: filter_closest(matches))
    else:
        rdd_filtered = rdd_filtered \
            .combineByKey(lambda value: [value],  # Create 1 element list
                          lambda value_list, value: value_list + [value],  # Add 1 element to list
                          lambda value_list_a, value_list_b: value_list_a + value_list_b)  # Add two lists together

    result_as_map = rdd_filtered.collectAsMap()

    return result_as_map