in analysis/webservice/algorithms_spark/TimeSeriesSpark.py [0:0]
def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates, min_elevation, max_elevation, tile_in_spark):
import shapely.wkt
from datetime import datetime
from pytz import timezone
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
(bounding_polygon, dataset, timestamps, fill) = tile_in_spark
if len(timestamps) == 0:
return []
tile_service = tile_service_factory()
logger.info(f'{max_elevation=} {min_elevation=}')
ds1_nexus_tiles = \
tile_service.get_tiles_bounded_by_box(bounding_polygon.bounds[1],
bounding_polygon.bounds[3],
bounding_polygon.bounds[0],
bounding_polygon.bounds[2],
dataset,
timestamps[0],
timestamps[-1],
rows=5000,
min_elevation=min_elevation,
max_elevation=max_elevation,
metrics_callback=metrics_callback,
distinct=True)
calculation_start = datetime.now()
tile_dict = {}
for timeinseconds in timestamps:
tile_dict[timeinseconds] = []
for i in range(len(ds1_nexus_tiles)):
tile = ds1_nexus_tiles[i]
tile_dict[tile.times[0]].append(i)
stats_arr = []
for timeinseconds in timestamps:
cur_tile_list = tile_dict[timeinseconds]
if len(cur_tile_list) == 0:
continue
tile_data_agg = \
np.ma.array(data=np.hstack([ds1_nexus_tiles[i].data.data.flatten()
for i in cur_tile_list
if (ds1_nexus_tiles[i].times[0] ==
timeinseconds)]),
mask=np.hstack([ds1_nexus_tiles[i].data.mask.flatten()
for i in cur_tile_list
if (ds1_nexus_tiles[i].times[0] ==
timeinseconds)]))
lats_agg = np.hstack([np.repeat(ds1_nexus_tiles[i].latitudes,
len(ds1_nexus_tiles[i].longitudes))
for i in cur_tile_list
if (ds1_nexus_tiles[i].times[0] ==
timeinseconds)])
if (len(tile_data_agg) == 0) or tile_data_agg.mask.all():
continue
else:
data_min = np.ma.min(tile_data_agg)
data_max = np.ma.max(tile_data_agg)
daily_mean = \
np.ma.average(tile_data_agg,
weights=np.cos(np.radians(lats_agg))).item()
data_count = np.ma.count(tile_data_agg)
data_std = np.ma.std(tile_data_agg)
# Return Stats by day
if normalize_dates:
timeinseconds = utils.normalize_date(timeinseconds)
stat = {
'min': data_min,
'max': data_max,
'mean': daily_mean,
'cnt': data_count,
'std': data_std,
'time': int(timeinseconds),
'iso_time': datetime.utcfromtimestamp(int(timeinseconds)).replace(tzinfo=timezone('UTC')).strftime(ISO_8601)
}
stats_arr.append(stat)
calculation_time = (datetime.now() - calculation_start).total_seconds()
metrics_callback(calculation=calculation_time)
return [stats_arr]