analysis/webservice/webmodel/NexusRequestObject.py (212 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import re from datetime import datetime from decimal import Decimal from typing import Optional, Tuple from pytz import UTC from shapely.geometry import Polygon from webservice.webmodel.RequestParameters import RequestParameters from webservice.webmodel.StatsComputeOptions import StatsComputeOptions class NexusRequestObject(StatsComputeOptions): shortNamePattern = re.compile("^[a-zA-Z0-9_\-,\.]+$") floatingPointPattern = re.compile('[+-]?(\d+(\.\d*)?|\.\d+)([eE][+-]?\d+)?') def __init__(self, reqHandler): self.__log = logging.getLogger(__name__) if reqHandler is None: raise Exception("Request handler cannot be null") self.requestHandler = reqHandler StatsComputeOptions.__init__(self) def get_headers(self): return self.requestHandler.request.headers def get_request_body(self): return self.requestHandler.request.body def get_argument(self, name, default=None): return self.requestHandler.get_argument(name, default=default) def get_list_int_arg(self, name, default=None): arg = self.get_argument(name, default=default) return arg.split(',') def __validate_is_shortname(self, v): if v is None or len(v) == 0: return False return self.shortNamePattern.match(v) is not None def __validate_is_number(self, v): if v is None or (type(v) == str and len(v) == 0): return False elif type(v) == int or type(v) == float: return True else: return self.floatingPointPattern.match(v) is not None def get_float_arg(self, name, default=0.0): arg = self.get_argument(name, default) if self.__validate_is_number(arg): return float(arg) else: return default def get_decimal_arg(self, name, default=0.0): arg = self.get_argument(name, default) if self.__validate_is_number(arg): return Decimal(arg) else: if default is None: return None return Decimal(default) def get_int_arg(self, name, default=0): arg = self.get_argument(name, default) if self.__validate_is_number(arg): return int(arg) else: return default def get_boolean_arg(self, name, default=False): arg = self.get_argument(name, "false" if not default else "true") return arg is not None and arg in ['true', '1', 't', 'y', 'yes', 'True', 'T', 'Y', 'Yes', True] def get_datetime_arg(self, name, default=None): time_str = self.get_argument(name, default=default) if time_str == default: return default try: dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) except ValueError: dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC) return dt def get_apply_seasonal_cycle_filter(self, default=True): return self.get_boolean_arg(RequestParameters.SEASONAL_CYCLE_FILTER, default=default) def get_max_lat(self, default=Decimal(90)): return self.get_decimal_arg("maxLat", default) def get_min_lat(self, default=Decimal(-90)): return self.get_decimal_arg("minLat", default) def get_max_lon(self, default=Decimal(180)): return self.get_decimal_arg("maxLon", default) def get_min_lon(self, default=Decimal(-180)): return self.get_decimal_arg("minLon", default) def get_elevation_args(self) -> Tuple[Optional[float], Optional[float]]: ''' Extract both single elevation args (depth, height, elevation) or min/max elevation args (minDepth/maxDepth, minHeight/maxHeight, minElevation/maxElevation) Return a tuple of the form (min, max) ''' min_depth = self.get_float_arg('minDepth', None) max_depth = self.get_float_arg('maxDepth', None) depth = self.get_float_arg('depth', None) min_height = self.get_float_arg('minHeight', None) max_height = self.get_float_arg('maxHeight', None) height = self.get_float_arg('height', None) min_elevation = self.get_float_arg('minElevation', None) max_elevation = self.get_float_arg('maxElevation', None) elevation = self.get_float_arg('elevation', None) # Handle single parameter cases if depth is not None: return -1 * depth, -1 * depth elif height is not None: return height, height elif elevation is not None: return elevation, elevation # Handle min/max parameter cases ret_min = min_elevation or min_height or (-1 * min_depth if min_depth is not None else None) ret_max = max_elevation or max_height or (-1 * max_depth if max_depth is not None else None) # Validate max > min unless using depth args if ret_max is not None and ret_min is not None and ret_max < ret_min: if min_depth is not None or max_depth is not None: ret_max, ret_min = ret_min, ret_max else: raise ValueError(f'Request max elevation less than min elevation: {ret_max} < {ret_min}') return ret_min, ret_max # added to fit the simplified version of TimeAvgMapSpark parse_argumemt def get_bounding_box(self): b = self.get_argument("b", '') if b: min_lon, min_lat, max_lon, max_lat = [float(e) for e in b.split(",")] else: max_lat = self.get_argument("maxLat", 90) max_lat = Decimal(max_lat) if self.__validate_is_number(max_lat) else 90 min_lat = self.get_argument("minLat", -90) min_lat = Decimal(min_lat) if self.__validate_is_number(min_lat) else -90 max_lon = self.get_argument("maxLon", 180) max_lon = Decimal(max_lon) if self.__validate_is_number(max_lon) else 180 min_lon = self.get_argument("minLon", -90) min_lon = Decimal(min_lon) if self.__validate_is_number(min_lon) else -90 return min_lon, min_lat, max_lon, max_lat def get_bounding_polygon(self): west, south, east, north = [float(b) for b in self.get_argument("b").split(",")] polygon = Polygon([(west, south), (east, south), (east, north), (west, north), (west, south)]) return polygon def get_dataset(self): ds = self.get_argument(RequestParameters.DATASET, None) if ds is not None and not self.__validate_is_shortname(ds): raise Exception("Invalid shortname") else: return ds.split(",") def get_metadata_filter(self): return self.requestHandler.get_arguments(RequestParameters.METADATA_FILTER) def get_environment(self): env = self.get_argument(RequestParameters.ENVIRONMENT, None) if env is None and "Origin" in self.requestHandler.request.headers: origin = self.requestHandler.request.headers["Origin"] if origin == "http://localhost:63342": env = "DEV" if origin == "https://sealevel.uat.earthdata.nasa.gov": env = "UAT" elif origin == "https://sealevel.sit.earthdata.nasa.gov": env = "SIT" elif origin == "https://sealevel.earthdata.nasa.gov": env = "PROD" if env not in ("DEV", "SIT", "UAT", "PROD", None): raise Exception("Invalid Environment") else: return env def get_start_time(self): return self.get_int_arg(RequestParameters.START_TIME, 0) def get_end_time(self): return self.get_int_arg(RequestParameters.END_TIME, -1) def get_start_year(self): return self.get_int_arg(RequestParameters.START_YEAR, 0) def get_end_year(self): return self.get_int_arg(RequestParameters.END_YEAR, -1) def get_clim_month(self): return self.get_int_arg(RequestParameters.CLIM_MONTH, -1) def get_start_datetime(self): #self.__log("get start datetime as {}".format(RequestParameters.START_TIME)) time_str = self.get_argument(RequestParameters.START_TIME) try: dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) except ValueError: dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC) return dt def get_end_datetime(self): time_str = self.get_argument(RequestParameters.END_TIME) try: dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) except ValueError: dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC) return dt def get_start_datetime_ms(self): time_str = self.get_argument(RequestParameters.START_TIME) try: dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) except ValueError: dt = datetime.utcfromtimestamp(int(time_str) / 1000).replace(tzinfo=UTC) return dt def get_end_datetime_ms(self): time_str = self.get_argument(RequestParameters.END_TIME) try: dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) except ValueError: dt = datetime.utcfromtimestamp(int(time_str) / 1000).replace(tzinfo=UTC) return dt def get_start_row(self): return self.get_int_arg(RequestParameters.START_ROW, 0) def get_row_count(self): return self.get_int_arg(RequestParameters.ROW_COUNT, 10) def get_content_type(self): return self.get_argument(RequestParameters.OUTPUT, "JSON") def get_apply_low_pass_filter(self, default=True): return self.get_boolean_arg(RequestParameters.APPLY_LOW_PASS, default) def get_low_pass_low_cut(self, default=12): return self.get_float_arg(RequestParameters.LOW_CUT, default) def get_low_pass_order(self, default=9): return self.get_float_arg(RequestParameters.ORDER, default) def get_include_meta(self): return self.get_boolean_arg("includemeta", True) def get_plot_series(self, default="mean"): return self.get_argument(RequestParameters.PLOT_SERIES, default=default) def get_plot_type(self, default="default"): return self.get_argument(RequestParameters.PLOT_TYPE, default=default) def get_nparts(self): return self.get_int_arg(RequestParameters.NPARTS, 0) def get_normalize_dates(self): return self.get_boolean_arg(RequestParameters.NORMALIZE_DATES, False)