def match_satellite_to_insitu()

in analysis/webservice/algorithms_spark/Matchup.py [0:0]


def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_b, rt_b, platforms_b,
                              bounding_wkt_b, depth_min_b, depth_max_b, tile_service_factory):
    the_time = datetime.now()
    tile_ids = list(tile_ids)
    if len(tile_ids) == 0:
        return []

    tile_service = tile_service_factory()

    # Determine the spatial temporal extents of this partition of tiles
    tiles_bbox = tile_service.get_bounding_box(tile_ids, ds=primary_b.value)
    tiles_min_time = tile_service.get_min_time(tile_ids, ds=primary_b.value)
    tiles_max_time = tile_service.get_max_time(tile_ids, ds=primary_b.value)

    # Increase spatial extents by the radius tolerance
    matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
                                                             -1 * rt_b.value)
    matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value)

    # Don't go outside of the search domain
    search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds
    matchup_min_lon = max(matchup_min_lon, search_min_x)
    matchup_min_lat = max(matchup_min_lat, search_min_y)
    matchup_max_lon = min(matchup_max_lon, search_max_x)
    matchup_max_lat = min(matchup_max_lat, search_max_y)

    # Find the centroid of the matchup bounding box and initialize the projections
    matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0]
    aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1])

    # Increase temporal extents by the time tolerance
    matchup_min_time = tiles_min_time - tt_b.value
    matchup_max_time = tiles_max_time + tt_b.value
    print("%s Time to determine spatial-temporal extents for partition %s to %s" % (
        str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))

    # Query edge for all points within the spatial-temporal extents of this partition
    is_insitu_dataset = edge_endpoints.get_provider_name(secondary_b.value) is not None

    if is_insitu_dataset:
        the_time = datetime.now()
        edge_session = requests.Session()
        edge_results = []
        with edge_session:
            for insitudata_name in secondary_b.value.split(','):
                bbox = ','.join(
                    [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
                edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
                                           platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
                if edge_response['total'] == 0:
                    continue
                r = edge_response['results']
                for p in r:
                    p['source'] = insitudata_name
                edge_results.extend(r)
        print("%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
        if len(edge_results) == 0:
            return []

        # Convert edge points to utm
        the_time = datetime.now()
        matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
        for n, edge_point in enumerate(edge_results):
            x, y = edge_point['longitude'], edge_point['latitude']
            matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
    else:
        # Query nexus (cassandra? solr?) to find matching points.
        bbox = ','.join(
            [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon),
             str(matchup_max_lat)])
        west, south, east, north = [float(b) for b in bbox.split(",")]
        polygon = Polygon(
            [(west, south), (east, south), (east, north), (west, north), (west, south)])

        # Find tile IDS from spatial/temporal bounds of partition
        matchup_tiles = tile_service.find_tiles_in_polygon(
            bounding_polygon=polygon,
            ds=secondary_b.value,
            start_time=matchup_min_time,
            end_time=matchup_max_time,
            fl='id',
            fetch_data=False,
            sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat asc'],
            rows=5000
        )

        # Convert Tile IDS to tiles and convert to UTM lat/lon projection.
        matchup_points = []
        edge_results = []
        for tile in matchup_tiles:
            # Retrieve tile data and convert to lat/lon projection
            tiles = tile_service.find_tile_by_id(tile.tile_id, fetch_data=True, ds=secondary_b.value)
            tile = tiles[0]

            valid_indices = tile.get_indices()

            primary_points = np.array([aeqd_proj(
                tile.longitudes[aslice[2]],
                tile.latitudes[aslice[1]]
            ) for aslice in valid_indices])
            matchup_points.extend(primary_points)
            edge_results.extend(tile_to_edge_points(tile))

        if len(matchup_points) <= 0:
            return []
        matchup_points = np.array(matchup_points)

    print("%s Time to convert match points for partition %s to %s" % (
        str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))

    # Build kdtree from matchup points
    the_time = datetime.now()
    m_tree = spatial.cKDTree(matchup_points, leafsize=30)
    print("%s Time to build matchup tree" % (str(datetime.now() - the_time)))

    # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
    match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
                                                      parameter_b.value, rt_b.value, aeqd_proj, primary_b.value)
                        for tile_id in tile_ids]

    return chain(*match_generators)