analysis/webservice/algorithms/LongitudeLatitudeMap.py (179 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math from datetime import datetime from pytz import timezone from shapely.geometry import box from webservice.NexusHandler import nexus_handler from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.webmodel import NexusResults, NexusProcessingException SENTINEL = 'STOP' EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) tile_service = None @nexus_handler class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler): name = "Longitude/Latitude Time Average Map" path = "/longitudeLatitudeMap" description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range" params = { "ds": { "name": "Dataset", "type": "string", "description": "One or more comma-separated dataset shortnames" }, "minLat": { "name": "Minimum Latitude", "type": "float", "description": "Minimum (Southern) bounding box Latitude" }, "maxLat": { "name": "Maximum Latitude", "type": "float", "description": "Maximum (Northern) bounding box Latitude" }, "minLon": { "name": "Minimum Longitude", "type": "float", "description": "Minimum (Western) bounding box Longitude" }, "maxLon": { "name": "Maximum Longitude", "type": "float", "description": "Maximum (Eastern) bounding box Longitude" }, "startTime": { "name": "Start Time", "type": "string", "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970)" }, "endTime": { "name": "End Time", "type": "string", "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970)" } } singleton = True def parse_arguments(self, request): # Parse input arguments try: ds = request.get_dataset()[0] except: raise NexusProcessingException(reason="'ds' argument is required", code=400) try: bounding_polygon = box(request.get_min_lon(), request.get_min_lat(), request.get_max_lon(), request.get_max_lat()) except: raise NexusProcessingException( reason="'minLon', 'minLat', 'maxLon', and 'maxLat' arguments are required.", code=400) try: start_time = request.get_start_datetime() except: raise NexusProcessingException( reason="'startTime' argument is required. Can be int value milliseconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", code=400) try: end_time = request.get_end_datetime() except: raise NexusProcessingException( reason="'endTime' argument is required. Can be int value milliseconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", code=400) start_seconds_from_epoch = int((start_time - EPOCH).total_seconds()) end_seconds_from_epoch = int((end_time - EPOCH).total_seconds()) return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch def calc(self, request, **args): ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch = self.parse_arguments(request) boxes = self._get_tile_service().get_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_seconds_from_epoch, end_seconds_from_epoch) point_avg_over_time = lat_lon_map_driver(bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, ds, [a_box.bounds for a_box in boxes]) kwargs = { "minLon": bounding_polygon.bounds[0], "minLat": bounding_polygon.bounds[1], "maxLon": bounding_polygon.bounds[2], "maxLat": bounding_polygon.bounds[3], "ds": ds, "startTime": datetime.utcfromtimestamp(start_seconds_from_epoch).strftime('%Y-%m-%dT%H:%M:%SZ'), "endTime": datetime.utcfromtimestamp(end_seconds_from_epoch).strftime('%Y-%m-%dT%H:%M:%SZ') } return LongitudeLatitudeMapResults( results=LongitudeLatitudeMapCalcHandlerImpl.results_to_dicts(point_avg_over_time), meta=None, **kwargs) @staticmethod def results_to_dicts(results): # ((lon, lat), (slope, intercept, r_value, p_value, std_err, mean, pmax, pmin, pstd, pcnt)) return [{ 'lon': result[0][0], 'lat': result[0][1], 'slope': result[1][0] if not math.isnan(result[1][0]) else 'NaN', 'intercept': result[1][1] if not math.isnan(result[1][1]) else 'NaN', 'r': result[1][2], 'p': result[1][3], 'stderr': result[1][4] if not math.isinf(result[1][4]) else 'Inf', 'avg': result[1][5], 'max': result[1][6], 'min': result[1][7], 'std': result[1][8], 'cnt': result[1][9], } for result in results] def pool_initializer(): from nexustiles.nexustiles import NexusTileService global tile_service tile_service = NexusTileService() # TODO This is a hack to make sure each sub-process uses it's own connection to cassandra. data-access needs to be updated from cassandra.cqlengine import connection from multiprocessing import current_process connection.register_connection(current_process().name, [host.address for host in connection.get_session().hosts]) connection.set_default_connection(current_process().name) def lat_lon_map_driver(search_bounding_polygon, search_start, search_end, ds, distinct_boxes): from functools import partial from nexustiles.nexustiles import NexusTileService # Start new processes to handle the work # pool = Pool(5, pool_initializer) func = partial(regression_on_tiles, search_bounding_polygon_wkt=search_bounding_polygon.wkt, search_start=search_start, search_end=search_end, ds=ds) global tile_service tile_service = NexusTileService() map_result = list(map(func, distinct_boxes)) return [item for sublist in map_result for item in sublist] # TODO Use for multiprocessing: # result = pool.map_async(func, distinct_boxes) # # pool.close() # while not result.ready(): # print "Not ready" # time.sleep(5) # # print result.successful() # print result.get()[0] # pool.join() # pool.terminate() # # return [item for sublist in result.get() for item in sublist] def calc_linear_regression(arry, xarry): from scipy.stats import linregress slope, intercept, r_value, p_value, std_err = linregress(arry, xarry) return slope, intercept, r_value, p_value, std_err def regression_on_tiles(tile_bounds, search_bounding_polygon_wkt, search_start, search_end, ds): if len(tile_bounds) < 1: return [] import numpy as np import operator from shapely import wkt from shapely.geometry import box search_bounding_shape = wkt.loads(search_bounding_polygon_wkt) tile_bounding_shape = box(*tile_bounds) # Load all tiles for given (exact) bounding box across the search time range tiles = tile_service.find_tiles_by_exact_bounds(tile_bounds, ds, search_start, search_end) if search_bounding_shape.contains(tile_bounding_shape): # The tile bounds are totally contained in the search area, we don't need to mask it. pass else: # The tile bounds cross the search area borders, we need to mask the tiles to the search area tiles = tile_service.mask_tiles_to_polygon(wkt.loads(search_bounding_polygon_wkt), tiles) # If all tiles end up being masked, there is no work to do if len(tiles) < 1: return [] tiles.sort(key=operator.attrgetter('min_time')) stacked_tile_data = np.stack(tuple([np.squeeze(tile.data, 0) for tile in tiles])) stacked_tile_lons = np.stack([tile.longitudes for tile in tiles]) stacked_tile_lats = np.stack([tile.latitudes for tile in tiles]) x_array = np.arange(stacked_tile_data.shape[0]) point_regressions = np.apply_along_axis(calc_linear_regression, 0, stacked_tile_data, x_array) point_means = np.nanmean(stacked_tile_data, 0) point_maximums = np.nanmax(stacked_tile_data, 0) point_minimums = np.nanmin(stacked_tile_data, 0) point_st_deviations = np.nanstd(stacked_tile_data, 0) point_counts = np.ma.count(np.ma.masked_invalid(stacked_tile_data), 0) results = [] for lat_idx, lon_idx in np.ndindex(point_means.shape): lon, lat = np.max(stacked_tile_lons[:, lon_idx]), np.max(stacked_tile_lats[:, lat_idx]) pcnt = point_counts[lat_idx, lon_idx] if pcnt == 0: continue mean = point_means[lat_idx, lon_idx] pmax = point_maximums[lat_idx, lon_idx] pmin = point_minimums[lat_idx, lon_idx] pstd = point_st_deviations[lat_idx, lon_idx] slope, intercept, r_value, p_value, std_err = point_regressions[:, lat_idx, lon_idx] results.append(((lon, lat), (slope, intercept, r_value, p_value, std_err, mean, pmax, pmin, pstd, pcnt))) return results class LongitudeLatitudeMapResults(NexusResults): def __init__(self, results=None, meta=None, computeOptions=None, **kwargs): NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions, **kwargs)