in analysis/webservice/algorithms_spark/MaximaMinimaSpark.py [0:0]
def calc(self, compute_options, **args):
"""
:param compute_options: StatsComputeOptions
:param args: dict
:return:
"""
ds, bbox, start_time, end_time, nparts_requested = self.parse_arguments(compute_options)
self._setQueryParams(ds,
(float(bbox.bounds[1]),
float(bbox.bounds[3]),
float(bbox.bounds[0]),
float(bbox.bounds[2])),
start_time,
end_time)
# for single timestamp, get bounds of all tiles
nexus_tiles = self._find_global_tile_set()
if len(nexus_tiles) == 0:
raise NoDataException(reason="No data found for selected timeframe")
self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
print(('Found {} tiles'.format(len(nexus_tiles))))
daysinrange = self._tile_service.find_days_in_range_asc(bbox.bounds[1],
bbox.bounds[3],
bbox.bounds[0],
bbox.bounds[2],
ds,
start_time,
end_time)
ndays = len(daysinrange)
if ndays == 0:
raise NoDataException(reason="No data found for selected timeframe")
self.log.debug('Found {0} days in range'.format(ndays))
for i, d in enumerate(daysinrange):
self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent,
self._maxLatCent))
self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
self._maxLonCent))
# Create array of tuples to pass to Spark map function
nexus_tiles_spark = [[self._find_tile_bounds(t),
self._startTime, self._endTime,
self._ds] for t in nexus_tiles]
# Remove empty tiles (should have bounds set to None)
bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
for i in np.flipud(bad_tile_inds):
del nexus_tiles_spark[i]
# Expand Spark map tuple array by duplicating each entry N times,
# where N is the number of ways we want the time dimension carved up.
# Set the time boundaries for each of the Spark map tuples so that
# every Nth element in the array gets the same time bounds.
max_time_parts = 72
num_time_parts = min(max_time_parts, ndays)
spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), (len(nexus_tiles_spark),1))
nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
# Launch Spark computations
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
max_min_part = rdd.map(partial(self._map, self._tile_service_factory))
max_min_count = \
max_min_part.combineByKey(lambda val: val,
lambda x, val: (np.maximum(x[0], val[0]), # Max
np.minimum(x[1], val[1]), # Min
np.maximum(x[2], val[2]), # Absolute Max
np.minimum(x[3], val[3]), # Absolute Min
(x[4] + val[4])), # Count
lambda x, y: (np.maximum(x[0], y[0]), # Max
np.minimum(x[1], y[1]), # Min
np.maximum(x[2], y[2]), # Absolute Max
np.minimum(x[3], y[3]), # Absolute Min
(x[4] + y[4]))) # Count
fill = self._fill
avg_tiles = \
max_min_count.map(lambda bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile:
(bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[0], [[{'maxima': bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][0][y, x] if (bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][4][y, x] > 0) else fill,
'minima': bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][1][y, x] if (bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][4][y, x] > 0) else fill,
'absolute_maxima': bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][2][y, x] if (bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][4][y, x] > 0) else fill,
'absolute_minima': bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][3][y, x] if (bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][4][y, x] > 0) else fill,
'cnt': bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][4][y, x]}
for x in range(bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][0].shape[1])]
for y in range(bounds_max_tile_min_tile_abs_max_tile_abs_min_tile_cnt_tile[1][0].shape[0])])).collect()
# Combine subset results to produce global map.
#
# The tiles below are NOT Nexus objects. They are tuples
# with the time avg map data and lat-lon bounding box.
a = np.zeros((self._nlats, self._nlons, 4), dtype=np.float64, order='C')
n = np.zeros((self._nlats, self._nlons), dtype=np.uint32, order='C')
for tile in avg_tiles:
if tile is not None:
((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
tile_stats) = tile
# need to arrange max/min data in tuple to pull out later - can this step be skipped?
tile_data = np.ma.array(
[[(tile_stats[y][x]['maxima'], tile_stats[y][x]['minima'], tile_stats[y][x]['absolute_maxima'], tile_stats[y][x]['absolute_minima']) for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
tile_cnt = np.array(
[[tile_stats[y][x]['cnt'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
tile_data.mask = ~(tile_cnt.astype(bool))
y0 = self._lat2ind(tile_min_lat)
y1 = y0 + tile_data.shape[0] - 1
x0 = self._lon2ind(tile_min_lon)
x1 = x0 + tile_data.shape[1] - 1
if np.any(np.logical_not(tile_data.mask)):
self.log.debug(
'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat,
tile_max_lat,
tile_min_lon,
tile_max_lon, y0,
y1, x0, x1))
a[y0:y1 + 1, x0:x1 + 1] = tile_data
n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
else:
self.log.debug(
'All pixels masked in tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(
tile_min_lat, tile_max_lat,
tile_min_lon, tile_max_lon,
y0, y1, x0, x1))
# Store global map in a NetCDF file.
# self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
# Create dict for JSON response
results = [[{'maxima': a[y, x, 0], 'minima': a[y, x, 1], 'absolute_maxima': a[y, x, 2], 'absolute_minima': a[y, x, 3], 'cnt': int(n[y, x]),
'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
for x in range(a.shape[1])] for y in range(a.shape[0])]
return NexusResults(results=results, meta={}, stats=None,
computeOptions=None, minLat=bbox.bounds[1],
maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
endTime=end_time)