analysis/webservice/algorithms/DailyDifferenceAverage.py (170 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import traceback
from datetime import datetime, timedelta
from multiprocessing.dummy import Pool, Manager
import numpy as np
import pytz
from nexustiles.nexustiles import NexusTileService, NexusTileServiceException
from shapely.geometry import box
from webservice.NexusHandler import nexus_handler
from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
from webservice.webmodel import NexusResults, NexusProcessingException
SENTINEL = 'STOP'
@nexus_handler
class DailyDifferenceAverageImpl(NexusCalcHandler):
name = "Daily Difference Average"
path = "/dailydifferenceaverage"
description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day."
params = {
"ds1": {
"name": "Dataset 1",
"type": "string",
"description": "The first Dataset shortname to use in calculation"
},
"ds2": {
"name": "Dataset 2",
"type": "string",
"description": "The second Dataset shortname to use in calculation"
},
"minLon": {
"name": "Minimum Longitude",
"type": "float",
"description": "Minimum (Western) bounding box Longitude"
},
"minLat": {
"name": "Minimum Latitude",
"type": "float",
"description": "Minimum (Southern) bounding box Latitude"
},
"maxLon": {
"name": "Maximum Longitude",
"type": "float",
"description": "Maximum (Eastern) bounding box Longitude"
},
"maxLat": {
"name": "Maximum Latitude",
"type": "float",
"description": "Maximum (Northern) bounding box Latitude"
},
"startTime": {
"name": "Start Time",
"type": "long integer",
"description": "Starting time in milliseconds since midnight Jan. 1st, 1970 UTC"
},
"endTime": {
"name": "End Time",
"type": "long integer",
"description": "Ending time in milliseconds since midnight Jan. 1st, 1970 UTC"
}
}
singleton = True
def calc(self, request, **args):
min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon()
dataset1 = request.get_argument("ds1", None)
dataset2 = request.get_argument("ds2", None)
start_time = request.get_start_time()
end_time = request.get_end_time()
simple = request.get_argument("simple", None) is not None
averagebyday = self.get_daily_difference_average_for_box(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
start_time, end_time)
averagebyday = sorted(averagebyday, key=lambda dayavg: dayavg[0])
if simple:
import matplotlib.pyplot as plt
from matplotlib.dates import date2num
times = [date2num(self.date_from_ms(dayavg[0])) for dayavg in averagebyday]
means = [dayavg[1] for dayavg in averagebyday]
plt.plot_date(times, means, ls='solid')
plt.xlabel('Date')
plt.xticks(rotation=70)
plt.ylabel('Difference from 5-Day mean (\u00B0C)')
plt.title('Sea Surface Temperature (SST) Anomalies')
plt.grid(True)
plt.tight_layout()
plt.savefig("test.png")
return averagebyday, None, None
else:
result = NexusResults(
results=[[{'time': dayms, 'mean': avg, 'ds': 0}] for dayms, avg in averagebyday],
stats={},
meta=self.get_meta())
result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time)
result.meta()['label'] = 'Difference from 5-Day mean (\u00B0C)'
return result
def date_from_ms(self, dayms):
base_datetime = datetime(1970, 1, 1)
delta = timedelta(0, 0, 0, dayms)
return base_datetime + delta
def get_meta(self):
meta = {
"title": "Sea Surface Temperature (SST) Anomalies",
"description": "SST anomolies are departures from the 5-day pixel mean",
"units": '\u00B0C',
}
return meta
def get_daily_difference_average_for_box(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
start_time,
end_time):
daysinrange = self._get_tile_service().find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset1,
start_time, end_time)
maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses"))
if maxprocesses == 1:
calculator = DailyDifferenceAverageCalculator()
averagebyday = []
for dayinseconds in daysinrange:
result = calculator.calc_average_diff_on_day(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2,
dayinseconds)
averagebyday.append((result[0], result[1]))
else:
# Create a task to calc average difference for each day
manager = Manager()
work_queue = manager.Queue()
done_queue = manager.Queue()
for dayinseconds in daysinrange:
work_queue.put(
('calc_average_diff_on_day', min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, dayinseconds))
[work_queue.put(SENTINEL) for _ in range(0, maxprocesses)]
# Start new processes to handle the work
pool = Pool(maxprocesses)
[pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in range(0, maxprocesses)]
pool.close()
# Collect the results as [(day (in ms), average difference for that day)]
averagebyday = []
for i in range(0, len(daysinrange)):
result = done_queue.get()
if result[0] == 'error':
print(result[1], file=sys.stderr)
raise NexusProcessingException(reason="Error calculating average by day.")
rdata = result
averagebyday.append((rdata[0], rdata[1]))
pool.terminate()
manager.shutdown()
return averagebyday
class DailyDifferenceAverageCalculator(object):
def __init__(self):
self.__tile_service = NexusTileService()
def calc_average_diff_on_day(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, timeinseconds):
day_of_year = datetime.fromtimestamp(timeinseconds, pytz.utc).timetuple().tm_yday
ds1_nexus_tiles = self.__tile_service.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon,
dataset1,
timeinseconds)
# Initialize list of differences
differences = []
# For each ds1tile
for ds1_tile in ds1_nexus_tiles:
# Get tile for ds2 using bbox from ds1_tile and day ms
try:
ds2_tile = self.__tile_service.find_tile_by_polygon_and_most_recent_day_of_year(
box(ds1_tile.bbox.min_lon, ds1_tile.bbox.min_lat, ds1_tile.bbox.max_lon, ds1_tile.bbox.max_lat),
dataset2, day_of_year)[0]
# Subtract ds2 tile from ds1 tile
diff = np.subtract(ds1_tile.data, ds2_tile.data)
except NexusTileServiceException:
# This happens when there is data in ds1tile but all NaNs in ds2tile because the
# Solr query being used filters out results where stats_count = 0.
# Technically, this should never happen if ds2 is a climatology generated in part from ds1
# and it is probably a data error
# For now, just treat ds2 as an array of all masked data (which essentially discards the ds1 data)
ds2_tile = np.ma.masked_all(ds1_tile.data.shape)
diff = np.subtract(ds1_tile.data, ds2_tile)
# Put results in list of differences
differences.append(np.ma.array(diff).ravel())
# Average List of differences
diffaverage = np.ma.mean(differences).item()
# Return Average by day
return int(timeinseconds), diffaverage
def pool_worker(work_queue, done_queue):
try:
calculator = DailyDifferenceAverageCalculator()
for work in iter(work_queue.get, SENTINEL):
scifunction = work[0]
args = work[1:]
result = calculator.__getattribute__(scifunction)(*args)
done_queue.put(result)
except Exception as e:
e_str = traceback.format_exc(e)
done_queue.put(('error', e_str))