analysis/webservice/algorithms_spark/HofMoellerSpark.py (411 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from io import StringIO
from datetime import datetime
from functools import partial
import matplotlib.pyplot as plt
import mpld3
import numpy as np
import shapely.geometry
from matplotlib import cm
from matplotlib.ticker import FuncFormatter
from pytz import timezone
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.algorithms_spark import utils
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%SZ'
SENTINEL = 'STOP'
LATITUDE = 0
LONGITUDE = 1
class HofMoellerCalculator(object):
@staticmethod
def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark):
(latlon, tile_id, index,
min_lat, max_lat, min_lon, max_lon, dataset) = tile_in_spark
tile_service = tile_service_factory()
try:
# Load the dataset tile
tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback, ds=dataset)[0]
calculation_start = datetime.now()
# Mask it to the search domain
tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
min_lon, max_lon, [tile])[0]
except IndexError:
# return None
return []
t = np.ma.min(tile.times)
stats = []
points = list(tile.nexus_point_generator())
if latlon == 0:
# Latitude-Time Map (Average over longitudes)
data = sorted(points, key=lambda p: p.latitude)
points_by_coord = itertools.groupby(data, key=lambda p: p.latitude)
else:
# Longitude-Time Map (Average over latitudes)
data = sorted(points, key=lambda p: p.longitude)
points_by_coord = itertools.groupby(data, key=lambda p: p.longitude)
for coord, points_at_coord in points_by_coord:
values_at_coord = np.array([[p.data_vals,
np.cos(np.radians(p.latitude))]
for p in points_at_coord], dtype='object')
vals = np.nan_to_num(values_at_coord[:, 0])
weights = values_at_coord[:, 1]
coord_cnt = len(values_at_coord)
if latlon == 0:
# Latitude-Time Map (Average over longitudes)
# In this case there is no weighting by cos(lat)
weighted_sum = np.sum(vals).item()
sum_of_weights = coord_cnt
else:
# Longitude-Time Map (Average over latitudes)
# In this case we need to weight by cos(lat)
weighted_sum = np.dot(vals, weights)
sum_of_weights = np.sum(weights).item()
stats.append(((t, float(coord)), (t, index, float(coord),
coord_cnt,
weighted_sum,
sum_of_weights,
np.max(vals).item(),
np.min(vals).item(),
np.var(vals).item())))
calculation_duration = (datetime.now() - calculation_start).total_seconds()
metrics_callback(calculation=calculation_duration)
return stats
class BaseHoffMoellerSparkHandlerImpl(NexusCalcSparkHandler):
params = {
"ds": {
"name": "Dataset",
"type": "comma-delimited string",
"description": "The dataset(s) Used to generate the plot. Required"
},
"startTime": {
"name": "Start Time",
"type": "string",
"description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
},
"endTime": {
"name": "End Time",
"type": "string",
"description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
},
"b": {
"name": "Bounding box",
"type": "comma-delimited float",
"description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
"Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
},
"spark": {
"name": "Spark Configuration",
"type": "comma-delimited value",
"description": "Configuration used to launch in the Spark cluster. Value should be 3 elements separated by "
"commas. 1) Spark Master 2) Number of Spark Executors 3) Number of Spark Partitions. Only "
"Number of Spark Partitions is used by this function. Optional (Default: local,1,1)"
}
}
def parse_arguments(self, request):
# Parse input arguments
self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()
if type(ds) == list or type(ds) == tuple:
ds = next(iter(ds))
except:
raise NexusProcessingException(
reason="'ds' argument is required. Must be a string",
code=400)
# Do not allow time series on Climatology
if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
raise NexusProcessingException(
reason="Cannot compute Latitude/Longitude Time Average plot on a climatology", code=400)
try:
bounding_polygon = request.get_bounding_polygon()
request.get_min_lon = lambda: bounding_polygon.bounds[0]
request.get_min_lat = lambda: bounding_polygon.bounds[1]
request.get_max_lon = lambda: bounding_polygon.bounds[2]
request.get_max_lat = lambda: bounding_polygon.bounds[3]
except:
try:
west, south, east, north = request.get_min_lon(), request.get_min_lat(), \
request.get_max_lon(), request.get_max_lat()
bounding_polygon = shapely.geometry.Polygon(
[(west, south), (east, south), (east, north), (west, north), (west, south)])
except:
raise NexusProcessingException(
reason="'b' argument is required. Must be comma-delimited float formatted as "
"Minimum (Western) Longitude, Minimum (Southern) Latitude, "
"Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
code=400)
try:
start_time = request.get_start_datetime()
except:
raise NexusProcessingException(
reason="'startTime' argument is required. Can be int value seconds from epoch or "
"string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
try:
end_time = request.get_end_datetime()
except:
raise NexusProcessingException(
reason="'endTime' argument is required. Can be int value seconds from epoch or "
"string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
if start_time > end_time:
raise NexusProcessingException(
reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
code=400)
start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
normalize_dates = request.get_normalize_dates()
min_elevation, max_elevation = request.get_elevation_args()
if (min_elevation and max_elevation) and min_elevation > max_elevation:
raise NexusProcessingException(
reason='Min elevation must be less than or equal to max elevation',
code=400
)
return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, normalize_dates, min_elevation, max_elevation
def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="mean", append=True):
shape = (len(results), len(results[0][pivot]))
if shape[0] <= 12:
return results
for a in range(0, 12):
values = []
for b in range(a, len(results), 12):
values.append(np.average([l[field] for l in results[b][pivot]]))
avg = np.average(values)
for b in range(a, len(results), 12):
for l in results[b][pivot]:
l["%sSeasonal" % field] = l[field] - avg
return results
def applyDeseasonToHofMoeller(self, results, pivot="lats", append=True):
results = self.applyDeseasonToHofMoellerByField(results, pivot, field="mean", append=append)
results = self.applyDeseasonToHofMoellerByField(results, pivot, field="min", append=append)
results = self.applyDeseasonToHofMoellerByField(results, pivot, field="max", append=append)
return results
def determine_parllelism(num_tiles):
"""
Try to stay at a maximum of 1500 tiles per partition; But don't go over 128 partitions.
Also, don't go below the default of 8
"""
num_partitions = max(min(num_tiles // 1500, 128), 8)
return num_partitions
def parallel_variance(avg_a, count_a, var_a, avg_b, count_b, var_b):
# Thanks Wikipedia https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
delta = avg_b - avg_a
m_a = var_a * (count_a - 1)
m_b = var_b * (count_b - 1)
M2 = m_a + m_b + delta ** 2 * count_a * count_b / (count_a + count_b)
return M2 / (count_a + count_b - 1)
def hof_tuple_time(t):
return t[0]
def hof_tuple_combine(t1, t2):
return (t1[0], # Time
t1[1], # Sequence (index)
t1[2], # Coordinate on axis (latitude or longitude)
t1[3] + t2[3], # Number of values
t1[4] + t2[4], # Sum of values (weighted for lon-time maps)
t1[5] + t2[5], # Sum of weights (= # of values for lat-time maps)
max(t1[6], t2[6]), # Maximum value
min(t1[7], t2[7]), # Minimum value
parallel_variance(t1[4] / t1[5], t1[3], t1[8],
t2[4] / t2[5], t2[3], t2[8])) # Variance
def hof_tuple_to_dict(t, avg_var_name):
return {avg_var_name: t[2],
'cnt': t[3],
'mean': t[4] / t[5],
'std': np.sqrt(t[8]),
'max': t[6],
'min': t[7]}
def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback, normalize_dates):
# Parallelize list of tile ids
rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
if latlon == 0:
# Latitude-Time Map (Average over longitudes)
avg_var_name = 'latitude'
avg_var_name_collection = 'lats'
else:
# Longitude-Time Map (Average over latitudes)
avg_var_name = 'longitude'
avg_var_name_collection = 'lons'
# Create a set of key-value pairs where the key is (time, lat|lon) and
# the value is a tuple of intermediate statistics for the specified
# coordinate within a single NEXUS tile.
metrics_callback(partitions=rdd.getNumPartitions())
results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, tile_service_factory, metrics_callback))
# Combine tuples across tiles with input key = (time, lat|lon)
# Output a key value pair with key = (time)
reduce_start = datetime.now()
results = results.combineByKey(lambda val: (hof_tuple_time(val), val),
lambda x, val: (hof_tuple_time(x),
hof_tuple_combine(x[1],
val)),
lambda x, y: (hof_tuple_time(x),
hof_tuple_combine(x[1],
y[1])))
# The functions create_combiner, merge_value, and merge_combiner are arguments to RDD.combineByKey()
def create_combiner(val):
time_in_seconds = val[0]
if normalize_dates:
time_in_seconds = utils.normalize_date(val[0])
return {
'sequence': val[1],
'time': time_in_seconds,
'iso_time': datetime.utcfromtimestamp(time_in_seconds).strftime(ISO_8601),
avg_var_name_collection: [hof_tuple_to_dict(val, avg_var_name)]
}
def merge_value(x, val):
return {
'sequence': x['sequence'],
'time': x['time'],
'iso_time': x['iso_time'],
avg_var_name_collection: (x[avg_var_name_collection] + [hof_tuple_to_dict(val, avg_var_name)])
}
def merge_combiner(x, y):
return {
'sequence': x['sequence'],
'time': x['time'],
'iso_time': x['iso_time'],
avg_var_name_collection: (x[avg_var_name_collection] + y[avg_var_name_collection])
}
# Convert the tuples to dictionary entries and combine coordinates
# with the same time stamp. Here we have input key = (time)
results = results.values().combineByKey(create_combiner, merge_value, merge_combiner).values().collect()
reduce_duration = (datetime.now() - reduce_start).total_seconds()
metrics_callback(reduce=reduce_duration)
return results
@nexus_handler
class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
name = "Latitude/Time HofMoeller Spark"
path = "/latitudeTimeHofMoellerSpark"
description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range"
params = BaseHoffMoellerSparkHandlerImpl.params
singleton = True
def __init__(self, **kwargs):
self._latlon = 0 # 0 for latitude-time map, 1 for longitude-time map
BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
def calc(self, compute_options, **args):
ds, bbox, start_time, end_time, normalize_dates, min_elevation, max_elevation = self.parse_arguments(compute_options)
metrics_record = self._create_metrics_record()
calculation_start = datetime.now()
min_lon, min_lat, max_lon, max_lat = bbox.bounds
nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon, tile.dataset) for x, tile in
enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
ds, start_time, end_time, min_elevation=min_elevation,
max_elevation=max_elevation,
metrics_callback=metrics_record.record_metrics,
fetch_data=False))]
print(("Got {} tiles".format(len(nexus_tiles_spark))))
if len(nexus_tiles_spark) == 0:
raise NoDataException(reason="No data found for selected timeframe")
results = spark_driver(self._sc,
self._latlon,
self._tile_service_factory,
nexus_tiles_spark,
metrics_record.record_metrics,
normalize_dates)
results = [_f for _f in results if _f]
results = sorted(results, key=lambda entry: entry['time'])
for i in range(len(results)):
results[i]['lats'] = sorted(results[i]['lats'],
key=lambda entry: entry['latitude'])
# Deseason disabled. See SDAP-148
# results = self.applyDeseasonToHofMoeller(results)
result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LATITUDE,
minLat=min_lat, maxLat=max_lat, minLon=min_lon,
maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
duration = (datetime.now() - calculation_start).total_seconds()
metrics_record.record_metrics(actual_time=duration)
metrics_record.print_metrics(self.log)
return result
@nexus_handler
class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
name = "Longitude/Time HofMoeller Spark"
path = "/longitudeTimeHofMoellerSpark"
description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range"
params = BaseHoffMoellerSparkHandlerImpl.params
singleton = True
def __init__(self, **kwargs):
self._latlon = 1 # 0 for latitude-time map; 1 for longitude-time map
BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
def calc(self, compute_options, **args):
ds, bbox, start_time, end_time, normalize_dates, min_elevation, max_elevation = self.parse_arguments(compute_options)
metrics_record = self._create_metrics_record()
calculation_start = datetime.now()
min_lon, min_lat, max_lon, max_lat = bbox.bounds
nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon, tile.dataset) for x, tile in
enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon,
ds, start_time, end_time, min_elevation=min_elevation,
max_elevation=max_elevation,
metrics_callback=metrics_record.record_metrics,
fetch_data=False))]
print(("Got {} tiles".format(len(nexus_tiles_spark))))
if len(nexus_tiles_spark) == 0:
raise NoDataException(reason="No data found for selected timeframe")
results = spark_driver(self._sc,
self._latlon,
self._tile_service_factory,
nexus_tiles_spark,
metrics_record.record_metrics,
normalize_dates)
results = [_f for _f in results if _f]
results = sorted(results, key=lambda entry: entry["time"])
for i in range(len(results)):
results[i]['lons'] = sorted(results[i]['lons'],
key=lambda entry: entry['longitude'])
# Deseason disabled. See SDAP-148
# results = self.applyDeseasonToHofMoeller(results, pivot="lons")
result = HoffMoellerResults(results=results, compute_options=None, type=HoffMoellerResults.LONGITUDE,
minLat=min_lat, maxLat=max_lat, minLon=min_lon,
maxLon=max_lon, ds=ds, startTime=start_time, endTime=end_time)
duration = (datetime.now() - calculation_start).total_seconds()
metrics_record.record_metrics(actual_time=duration)
metrics_record.print_metrics(self.log)
return result
class HoffMoellerResults(NexusResults):
LATITUDE = 0
LONGITUDE = 1
def __init__(self, results=None, meta=None, stats=None, **kwargs):
NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=None, **kwargs)
self.__type = kwargs['type']
def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'):
cmap = cm.coolwarm
# ls = LightSource(315, 45)
# rgb = ls.shade(data, cmap)
fig, ax = plt.subplots()
fig.set_size_inches(11.0, 8.5)
cax = ax.imshow(data, interpolation=interpolate, cmap=cmap)
def yFormatter(y, pos):
if y < len(coordSeries):
return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.)
else:
return ""
def xFormatter(x, pos):
if x < len(timeSeries):
return timeSeries[int(x)].strftime('%b %Y')
else:
return ""
ax.xaxis.set_major_formatter(FuncFormatter(xFormatter))
ax.yaxis.set_major_formatter(FuncFormatter(yFormatter))
ax.set_title(title)
ax.set_ylabel(coordName)
ax.set_xlabel('Date')
fig.colorbar(cax)
fig.autofmt_xdate()
labels = ['point {0}'.format(i + 1) for i in range(len(data))]
# plugins.connect(fig, plugins.MousePosition(fontsize=14))
tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels)
sio = StringIO()
plt.savefig(sio, format='png')
return sio.getvalue()
def createLongitudeHoffmueller(self, res, meta):
lonSeries = [m['longitude'] for m in res[0]['lons']]
timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
data = np.zeros((len(lonSeries), len(timeSeries)))
plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
if plotSeries is None:
plotSeries = "avg"
for t in range(0, len(timeSeries)):
timeSet = res[t]
for l in range(0, len(lonSeries)):
latSet = timeSet['lons'][l]
value = latSet[plotSeries]
data[len(lonSeries) - l - 1][t] = value
title = meta['title']
source = meta['source']
dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
return self.createHoffmueller(data, lonSeries, timeSeries, "Longitude",
"%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
def createLatitudeHoffmueller(self, res, meta):
latSeries = [m['latitude'] for m in res[0]['lats']]
timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
data = np.zeros((len(latSeries), len(timeSeries)))
plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
if plotSeries is None:
plotSeries = "avg"
for t in range(0, len(timeSeries)):
timeSet = res[t]
for l in range(0, len(latSeries)):
latSet = timeSet['lats'][l]
value = latSet[plotSeries]
data[len(latSeries) - l - 1][t] = value
title = meta['title']
source = meta['source']
dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
return self.createHoffmueller(data, latSeries, timeSeries, "Latitude",
title="%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
def toImage(self):
res = self.results()
meta = self.meta()
if self.__type == HoffMoellerResults.LATITUDE:
return self.createLatitudeHoffmueller(res, meta)
elif self.__type == HoffMoellerResults.LONGITUDE:
return self.createLongitudeHoffmueller(res, meta)
else:
raise Exception("Unsupported HoffMoeller Plot Type")