analysis/webservice/algorithms/CorrelationMap.py (99 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import math from itertools import groupby import numpy as np from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon from scipy.stats import linregress from shapely.geometry import box from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.webmodel import NexusProcessingException, NexusResults @nexus_handler class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler): name = "Correlation Map" path = "/correlationMap" description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range" params = DEFAULT_PARAMETERS_SPEC params["res"] = { "name": "Resolution", "type": "float", "description": "The resolution of the resulting correlation map" } singleton = True def calc(self, computeOptions, **args): minLat = computeOptions.get_min_lat() maxLat = computeOptions.get_max_lat() minLon = computeOptions.get_min_lon() maxLon = computeOptions.get_max_lon() ds = computeOptions.get_dataset() startTime = computeOptions.get_start_time() endTime = computeOptions.get_end_time() resolution = computeOptions.get_decimal_arg("res", default=1.0) if not len(ds) == 2: raise Exception("Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2") ds1tiles = self._get_tile_service().find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[0], startTime, endTime) ds2tiles = self._get_tile_service().find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[1], startTime, endTime) matches = self._match_tiles(ds1tiles, ds2tiles) if len(matches) == 0: raise NexusProcessingException(reason="Could not find any data temporally co-located") results = [[{ 'cnt': 0, 'slope': 0, 'intercept': 0, 'r': 0, 'p': 0, 'stderr': 0, 'lat': float(lat), 'lon': float(lon) } for lon in np.arange(minLon, maxLon, resolution)] for lat in np.arange(minLat, maxLat, resolution)] for stats in results: for stat in stats: values_x = [] values_y = [] for tile_matches in matches: tile_1_list = tile_matches[0] value_1 = get_approximate_value_for_lat_lon(tile_1_list, stat["lat"], stat["lon"]) tile_2_list = tile_matches[1] value_2 = get_approximate_value_for_lat_lon(tile_2_list, stat["lat"], stat["lon"]) if not (math.isnan(value_1) or math.isnan(value_2)): values_x.append(value_1) values_y.append(value_2) if len(values_x) > 2 and len(values_y) > 2: stats = linregress(values_x, values_y) stat["slope"] = stats[0] if not math.isnan(stats[0]) and not math.isinf(stats[0]) else str(stats[0]) stat["intercept"] = stats[1] if not math.isnan(stats[1]) and not math.isinf(stats[1]) else str( stats[1]) stat["r"] = stats[2] if not math.isnan(stats[2]) and not math.isinf(stats[2]) else str(stats[2]) stat["p"] = stats[3] if not math.isnan(stats[3]) and not math.isinf(stats[3]) else str(stats[3]) stat["stderr"] = stats[4] if not math.isnan(stats[4]) and not math.isinf(stats[4]) else str( stats[4]) stat["cnt"] = len(values_x) return CorrelationResults(results) def _match_tiles(self, tiles_1, tiles_2): date_map = {} tiles_1 = sorted(tiles_1, key=lambda tile: tile.min_time) for tile_start_time, tiles in groupby(tiles_1, lambda tile: tile.min_time): if not tile_start_time in date_map: date_map[tile_start_time] = [] date_map[tile_start_time].append(list(tiles)) tiles_2 = sorted(tiles_2, key=lambda tile: tile.min_time) for tile_start_time, tiles in groupby(tiles_2, lambda tile: tile.min_time): if not tile_start_time in date_map: date_map[tile_start_time] = [] date_map[tile_start_time].append(list(tiles)) matches = [tiles for a_time, tiles in date_map.items() if len(tiles) == 2] return matches class CorrelationResults(NexusResults): def __init__(self, results): NexusResults.__init__(self) self.results = results def toJson(self): json_d = { "stats": {}, "meta": [None, None], "data": self.results } return json.dumps(json_d, indent=4)