analysis/webservice/algorithms/TimeAvgMap.py (214 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # distutils: include_dirs = /usr/local/lib/python2.7/site-packages/cassandra import sys import numpy as np from time import time from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.webmodel import NexusResults, NoDataException from netCDF4 import Dataset # from mpl_toolkits.basemap import Basemap # @nexus_handler class TimeAvgMapCalcHandlerImpl(NexusCalcHandler): name = "Time Average Map" path = "/timeAvgMap" description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range" params = DEFAULT_PARAMETERS_SPEC singleton = True def _find_native_resolution(self): # Get a quick set of tiles (1 degree at center of box) at 1 time stamp midLat = (self._minLat + self._maxLat) / 2 midLon = (self._minLon + self._maxLon) / 2 ntiles = 0 t = self._endTime t_incr = 86400 while ntiles == 0: nexus_tiles = self._get_tile_service().get_tiles_bounded_by_box(midLat - 0.5, midLat + 0.5, midLon - 0.5, midLon + 0.5, ds=self._ds, start_time=t - t_incr, end_time=t) ntiles = len(nexus_tiles) print('find_native_res: got %d tiles' % len(nexus_tiles)) sys.stdout.flush() lat_res = 0. lon_res = 0. if ntiles > 0: for tile in nexus_tiles: if lat_res < 1e-10: lats = tile.latitudes.compressed() if (len(lats) > 1): lat_res = lats[1] - lats[0] if lon_res < 1e-10: lons = tile.longitudes.compressed() if (len(lons) > 1): lon_res = lons[1] - lons[0] if (lat_res >= 1e-10) and (lon_res >= 1e-10): break if (lat_res < 1e-10) or (lon_res < 1e-10): t -= t_incr self._latRes = lat_res self._lonRes = lon_res def _find_global_tile_set(self): ntiles = 0 t = self._endTime t_incr = 86400 while ntiles == 0: nexus_tiles = self._get_tile_service().get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=self._ds, start_time=t - t_incr, end_time=t) ntiles = len(nexus_tiles) print('find_global_tile_set got %d tiles' % ntiles) sys.stdout.flush() t -= t_incr return nexus_tiles def _prune_tiles(self, nexus_tiles): del_ind = np.where([np.all(tile.data.mask) for tile in nexus_tiles])[0] for i in np.flipud(del_ind): del nexus_tiles[i] # @staticmethod # def _map(tile_in): def _map(self, tile_in): print('Started tile %s' % tile_in.section_spec) print('tile lats = ', tile_in.latitudes) print('tile lons = ', tile_in.longitudes) print('tile = ', tile_in.data) sys.stdout.flush() lats = tile_in.latitudes lons = tile_in.longitudes if len(lats > 0) and len(lons > 0): min_lat = np.ma.min(lats) max_lat = np.ma.max(lats) min_lon = np.ma.min(lons) max_lon = np.ma.max(lons) good_inds_lat = np.where(lats.mask == False) good_inds_lon = np.where(lons.mask == False) min_y = np.min(good_inds_lat) max_y = np.max(good_inds_lat) min_x = np.min(good_inds_lon) max_x = np.max(good_inds_lon) tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) days_at_a_time = 90 t_incr = 86400 * days_at_a_time avg_tile = np.ma.array(np.zeros(tile_inbounds_shape, dtype=np.float64)) cnt_tile = np.ma.array(np.zeros(tile_inbounds_shape, dtype=np.uint32)) t_start = self._startTime while t_start <= self._endTime: t_end = min(t_start + t_incr, self._endTime) t1 = time() print('nexus call start at time %f' % t1) sys.stdout.flush() nexus_tiles = self._get_tile_service().get_tiles_bounded_by_box(min_lat - self._latRes / 2, max_lat + self._latRes / 2, min_lon - self._lonRes / 2, max_lon + self._lonRes / 2, ds=self._ds, start_time=t_start, end_time=t_end) t2 = time() print('nexus call end at time %f' % t2) print('secs in nexus call: ', t2 - t1) sys.stdout.flush() self._prune_tiles(nexus_tiles) print('t %d to %d - Got %d tiles' % (t_start, t_end, len(nexus_tiles))) sys.stdout.flush() for tile in nexus_tiles: tile.data.data[:, :] = np.nan_to_num(tile.data.data) avg_tile.data[:, :] += tile.data[0, min_y:max_y + 1, min_x:max_x + 1] cnt_tile.data[:, :] += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 1]).astype(np.uint8) t_start = t_end + 1 print('cnt_tile = ', cnt_tile) cnt_tile.mask = ~(cnt_tile.data.astype(bool)) avg_tile.mask = cnt_tile.mask avg_tile /= cnt_tile print('Finished tile %s' % tile_in.section_spec) print('Tile avg = ', avg_tile) sys.stdout.flush() else: avg_tile = None min_lat = None max_lat = None min_lon = None max_lon = None print('Tile %s outside of bounding box' % tile_in.section_spec) sys.stdout.flush() return (avg_tile, min_lat, max_lat, min_lon, max_lon) def _lat2ind(self, lat): return int((lat - self._minLatCent) / self._latRes) def _lon2ind(self, lon): return int((lon - self._minLonCent) / self._lonRes) def _create_nc_file(self, a): print('a=', a) print('shape a = ', a.shape) sys.stdout.flush() lat_dim, lon_dim = a.shape rootgrp = Dataset("tam.nc", "w", format="NETCDF4") rootgrp.createDimension("lat", lat_dim) rootgrp.createDimension("lon", lon_dim) rootgrp.createVariable("TRMM_3B42_daily_precipitation_V7", "f4", dimensions=("lat", "lon",)) rootgrp.createVariable("lat", "f4", dimensions=("lat",)) rootgrp.createVariable("lon", "f4", dimensions=("lon",)) rootgrp.variables["TRMM_3B42_daily_precipitation_V7"][:, :] = a rootgrp.variables["lat"][:] = np.linspace(self._minLatCent, self._maxLatCent, lat_dim) rootgrp.variables["lon"][:] = np.linspace(self._minLonCent, self._maxLonCent, lon_dim) rootgrp.close() def calc(self, computeOptions, **args): """ :param computeOptions: StatsComputeOptions :param args: dict :return: """ self._minLat = float(computeOptions.get_min_lat()) self._maxLat = float(computeOptions.get_max_lat()) self._minLon = float(computeOptions.get_min_lon()) self._maxLon = float(computeOptions.get_max_lon()) self._ds = computeOptions.get_dataset()[0] self._startTime = computeOptions.get_start_time() self._endTime = computeOptions.get_end_time() self._find_native_resolution() print('Using Native resolution: lat_res=%f, lon_res=%f' % (self._latRes, self._lonRes)) self._minLatCent = self._minLat + self._latRes / 2 self._minLonCent = self._minLon + self._lonRes / 2 nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1 nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1 self._maxLatCent = self._minLatCent + (nlats - 1) * self._latRes self._maxLonCent = self._minLonCent + (nlons - 1) * self._lonRes print('nlats=', nlats, 'nlons=', nlons) print('center lat range = %f to %f' % (self._minLatCent, self._maxLatCent)) print('center lon range = %f to %f' % (self._minLonCent, self._maxLonCent)) sys.stdout.flush() a = np.zeros((nlats, nlons), dtype=np.float64, order='C') nexus_tiles = self._find_global_tile_set() # print 'tiles:' # for tile in nexus_tiles: # print tile.granule # print tile.section_spec # print 'lat:', tile.latitudes # print 'lon:', tile.longitudes # nexus_tiles) if len(nexus_tiles) == 0: raise NoDataException(reason="No data found for selected timeframe") print('Initially found %d tiles' % len(nexus_tiles)) sys.stdout.flush() self._prune_tiles(nexus_tiles) print('Pruned to %d tiles' % len(nexus_tiles)) sys.stdout.flush() # for tile in nexus_tiles: # print 'lats: ', tile.latitudes.compressed() # print 'lons: ', tile.longitudes.compressed() avg_tiles = list(map(self._map, nexus_tiles)) print('shape a = ', a.shape) sys.stdout.flush() # The tiles below are NOT Nexus objects. They are tuples # with the time avg map data and lat-lon bounding box. for tile in avg_tiles: if tile is not None: (tile_data, tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon) = tile print('shape tile_data = ', tile_data.shape) print('tile data mask = ', tile_data.mask) sys.stdout.flush() if np.any(np.logical_not(tile_data.mask)): y0 = self._lat2ind(tile_min_lat) y1 = self._lat2ind(tile_max_lat) x0 = self._lon2ind(tile_min_lon) x1 = self._lon2ind(tile_max_lon) print('writing tile lat %f-%f, lon %f-%f, map y %d-%d, map x %d-%d' % \ (tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon, y0, y1, x0, x1)) sys.stdout.flush() a[y0:y1 + 1, x0:x1 + 1] = tile_data else: print('All pixels masked in tile lat %f-%f, lon %f-%f, map y %d-%d, map x %d-%d' % \ (tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon, y0, y1, x0, x1)) sys.stdout.flush() self._create_nc_file(a) return TimeAvgMapResults(results={}, meta={}, computeOptions=computeOptions) class TimeAvgMapResults(NexusResults): def __init__(self, results=None, meta=None, computeOptions=None): NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)