in analysis/webservice/algorithms_spark/CorrMapSpark.py [0:0]
def calc(self, computeOptions, **args):
self._setQueryParams(computeOptions.get_dataset(),
(float(computeOptions.get_min_lat()),
float(computeOptions.get_max_lat()),
float(computeOptions.get_min_lon()),
float(computeOptions.get_max_lon())),
computeOptions.get_start_time(),
computeOptions.get_end_time())
nparts_requested = computeOptions.get_nparts()
self.log.debug('ds = {0}'.format(self._ds))
if not len(self._ds) == 2:
raise NexusProcessingException(
reason="Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2",
code=400)
if next(iter([clim for clim in self._ds if 'CLIM' in clim]), False):
raise NexusProcessingException(reason="Cannot compute correlation on a climatology", code=400)
nexus_tiles = self._find_global_tile_set()
# print 'tiles:'
# for tile in nexus_tiles:
# print tile.granule
# print tile.section_spec
# print 'lat:', tile.latitudes
# print 'lon:', tile.longitudes
# nexus_tiles)
if len(nexus_tiles) == 0:
raise NoDataException(reason="No data found for selected timeframe")
self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
daysinrange = self._get_tile_service().find_days_in_range_asc(self._minLat,
self._maxLat,
self._minLon,
self._maxLon,
self._ds[0],
self._startTime,
self._endTime)
ndays = len(daysinrange)
if ndays == 0:
raise NoDataException(reason="No data found for selected timeframe")
self.log.debug('Found {0} days in range'.format(ndays))
for i, d in enumerate(daysinrange):
self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
# Create array of tuples to pass to Spark map function
nexus_tiles_spark = np.array([[self._find_tile_bounds(t),
self._startTime, self._endTime,
self._ds] for t in nexus_tiles], dtype='object')
# Remove empty tiles (should have bounds set to None)
bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
for i in np.flipud(bad_tile_inds):
del nexus_tiles_spark[i]
# Expand Spark map tuple array by duplicating each entry N times,
# where N is the number of ways we want the time dimension carved up.
# Set the time boundaries for each of the Spark map tuples so that
# every Nth element in the array gets the same time bounds.
max_time_parts = 72
num_time_parts = min(max_time_parts, ndays)
spark_part_time_ranges = np.tile(
np.array([a[[0, -1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]),
(len(nexus_tiles_spark), 1))
nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
# Launch Spark computations
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
sum_tiles_part = rdd.map(partial(self._map, self._tile_service_factory))
# print "sum_tiles_part = ",sum_tiles_part.collect()
sum_tiles = \
sum_tiles_part.combineByKey(lambda val: val,
lambda x, val: (x[0] + val[0],
x[1] + val[1],
x[2] + val[2],
x[3] + val[3],
x[4] + val[4],
x[5] + val[5]),
lambda x, y: (x[0] + y[0],
x[1] + y[1],
x[2] + y[2],
x[3] + y[3],
x[4] + y[4],
x[5] + y[5]))
# Convert the N (pixel-wise count) array for each tile to be a
# NumPy masked array. That is the last array in the tuple of
# intermediate summation arrays. Set mask to True if count is 0.
sum_tiles = \
sum_tiles.map(lambda bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n:
(bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n[0], (bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n[1][0], bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n[1][1], bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n[1][2], bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n[1][3], bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n[1][4],
np.ma.array(bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n[1][5],
mask=~(bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n[1][5].astype(bool))))))
# print 'sum_tiles = ',sum_tiles.collect()
# For each pixel in each tile compute an array of Pearson
# correlation coefficients. The map function is called once
# per tile. The result of this map operation is a list of 3-tuples of
# (bounds, r, n) for each tile (r=Pearson correlation coefficient
# and n=number of input values that went into each pixel with
# any masked values not included).
corr_tiles = \
sum_tiles.map(lambda bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1:
(bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[0],
np.ma.array(((bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][4] - bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][0] * bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][1] / bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][5]) /
np.sqrt((bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][2] - bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][0] * bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][0] / bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][5]) *
(bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][3] - bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][1] * bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][1] / bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][5]))),
mask=~(bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][5].astype(bool))),
bounds_sum_x_sum_y_sum_xx_sum_yy_sum_xy_n1[1][5])).collect()
r = np.zeros((self._nlats, self._nlons), dtype=np.float64, order='C')
n = np.zeros((self._nlats, self._nlons), dtype=np.uint32, order='C')
# The tiles below are NOT Nexus objects. They are tuples
# with the following for each correlation map subset:
# (1) lat-lon bounding box, (2) array of correlation r values,
# and (3) array of count n values.
for tile in corr_tiles:
((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
tile_data, tile_cnt) = tile
y0 = self._lat2ind(tile_min_lat)
y1 = self._lat2ind(tile_max_lat)
x0 = self._lon2ind(tile_min_lon)
x1 = self._lon2ind(tile_max_lon)
self.log.debug(
'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat, tile_max_lat,
tile_min_lon, tile_max_lon,
y0, y1, x0, x1))
r[y0:y1 + 1, x0:x1 + 1] = tile_data
n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
# Store global map in a NetCDF file.
# self._create_nc_file(r, 'corrmap.nc', 'r')
# Create dict for JSON response
results = [[{'r': r[y, x], 'cnt': int(n[y, x]),
'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
for x in range(r.shape[1])] for y in range(r.shape[0])]
return CorrelationResults(results)