def calc_average_on_day()

in analysis/webservice/algorithms_spark/TimeSeriesSpark.py [0:0]


def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates, tile_in_spark):
    import shapely.wkt
    from datetime import datetime
    from pytz import timezone
    ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'

    (bounding_polygon, dataset, timestamps, fill) = tile_in_spark
    if len(timestamps) == 0:
        return []
    tile_service = tile_service_factory()

    ds1_nexus_tiles = \
        tile_service.get_tiles_bounded_by_box(bounding_polygon.bounds[1], 
                                            bounding_polygon.bounds[3],
                                            bounding_polygon.bounds[0],
                                            bounding_polygon.bounds[2],
                                            dataset,
                                            timestamps[0],
                                            timestamps[-1],
                                            rows=5000,
                                            metrics_callback=metrics_callback)
    
    calculation_start = datetime.now()

    tile_dict = {}
    for timeinseconds in timestamps:
        tile_dict[timeinseconds] = []

    for i in range(len(ds1_nexus_tiles)):
        tile = ds1_nexus_tiles[i]
        tile_dict[tile.times[0]].append(i)

    stats_arr = []
    for timeinseconds in timestamps:
        cur_tile_list = tile_dict[timeinseconds]
        if len(cur_tile_list) == 0:
            continue
        tile_data_agg = \
            np.ma.array(data=np.hstack([ds1_nexus_tiles[i].data.data.flatten()
                                        for i in cur_tile_list
                                        if (ds1_nexus_tiles[i].times[0] ==
                                            timeinseconds)]),
                        mask=np.hstack([ds1_nexus_tiles[i].data.mask.flatten()
                                        for i in cur_tile_list
                                        if (ds1_nexus_tiles[i].times[0] ==
                                            timeinseconds)]))
        lats_agg = np.hstack([np.repeat(ds1_nexus_tiles[i].latitudes,
                                        len(ds1_nexus_tiles[i].longitudes))
                              for i in cur_tile_list
                              if (ds1_nexus_tiles[i].times[0] ==
                                  timeinseconds)])
        if (len(tile_data_agg) == 0) or tile_data_agg.mask.all():
            continue
        else:
            data_min = np.ma.min(tile_data_agg)
            data_max = np.ma.max(tile_data_agg)
            daily_mean = \
                np.ma.average(tile_data_agg,
                              weights=np.cos(np.radians(lats_agg))).item()
            data_count = np.ma.count(tile_data_agg)
            data_std = np.ma.std(tile_data_agg)

        # Return Stats by day
        if normalize_dates:
            timeinseconds = utils.normalize_date(timeinseconds)

        stat = {
            'min': data_min,
            'max': data_max,
            'mean': daily_mean,
            'cnt': data_count,
            'std': data_std,
            'time': int(timeinseconds),
            'iso_time': datetime.utcfromtimestamp(int(timeinseconds)).replace(tzinfo=timezone('UTC')).strftime(ISO_8601)
        }
        stats_arr.append(stat)

    calculation_time = (datetime.now() - calculation_start).total_seconds()
    metrics_callback(calculation=calculation_time)

    return [stats_arr]