analysis/webservice/algorithms/doms/fetchedgeimpl.py (153 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import traceback from datetime import datetime from multiprocessing.pool import ThreadPool import requests from . import geo from . import values from webservice.webmodel import NexusProcessingException def __parseDatetime(dtString): dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") epoch = datetime.utcfromtimestamp(0) time = (dt - epoch).total_seconds() * 1000.0 return time def __parseLocation(locString): if "Point" in locString: locString = locString[6:-1] if "," in locString: latitude = float(locString.split(",")[0]) longitude = float(locString.split(",")[1]) else: latitude = float(locString.split(" ")[1]) longitude = float(locString.split(" ")[0]) return (latitude, longitude) def __resultRawToUsable(resultdict): resultdict["time"] = __parseDatetime(resultdict["time"]) latitude, longitude = __parseLocation(resultdict["point"]) resultdict["x"] = longitude resultdict["y"] = latitude if "id" not in resultdict and "metadata" in resultdict: resultdict["id"] = resultdict["metadata"] resultdict["id"] = "id-%s" % resultdict["id"] if "device" in resultdict: resultdict["device"] = values.getDeviceById(resultdict["device"]) if "platform" in resultdict: resultdict["platform"] = values.getPlatformById(resultdict["platform"]) if "mission" in resultdict: resultdict["mission"] = values.getMissionById(resultdict["mission"]) if "sea_surface_temperature" in resultdict: resultdict["sea_water_temperature"] = resultdict["sea_surface_temperature"] del resultdict["sea_surface_temperature"] return resultdict def __fetchJson(url, params, trycount=1, maxtries=5): if trycount > maxtries: raise Exception("Maximum retries attempted.") if trycount > 1: print("Retry #", trycount) r = requests.get(url, params=params, timeout=500.000) print(r.url) if r.status_code != 200: return __fetchJson(url, params, trycount + 1, maxtries) try: results = json.loads(r.text) return results except: return __fetchJson(url, params, trycount + 1, maxtries) def __doQuery(endpoint, startTime, endTime, bbox, depth_min=None, depth_max=None, itemsPerPage=10, startIndex=0, platforms=None, pageCallback=None): params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": "true"} if depth_min is not None: params['minDepth'] = depth_min if depth_max is not None: params['maxDepth'] = depth_max if platforms is not None: params["platform"] = platforms.split(",") resultsRaw = __fetchJson(endpoint["url"], params) boundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180) if resultsRaw["totalResults"] == 0 or len(resultsRaw["results"]) == 0: # Double-sanity check return [], resultsRaw["totalResults"], startIndex, itemsPerPage, boundsConstrainer try: results = [] for resultdict in resultsRaw["results"]: result = __resultRawToUsable(resultdict) result["source"] = endpoint["name"] boundsConstrainer.testCoords(north=result["y"], south=result["y"], west=result["x"], east=result["x"]) results.append(result) if "stats_fields" in resultsRaw and len(resultsRaw["results"]) == 0: stats = resultsRaw["stats_fields"] if "lat" in stats and "lon" in stats: boundsConstrainer.testCoords(north=stats['lat']['max'], south=stats['lat']['min'], west=stats['lon']['min'], east=stats['lon']['max']) if pageCallback is not None: pageCallback(results) ''' If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it. ''' if pageCallback is None: return results, int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int( resultsRaw["itemsPerPage"]), boundsConstrainer else: return [], int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int( resultsRaw["itemsPerPage"]), boundsConstrainer except: print("Invalid or missing JSON in response.") traceback.print_exc() raise NexusProcessingException(reason="Invalid or missing JSON in response.") # return [], 0, startIndex, itemsPerPage, boundsConstrainer def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): startIndex = 0 pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime, endTime, bbox, depth_min, depth_max, 0, startIndex, platforms) return totalResults, boundsConstrainer def fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, pageCallback=None): results = [] startIndex = 0 mainBoundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180) # First isn't parellel so we can get the ttl results, forced items per page, etc... pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime, endTime, bbox, depth_min, depth_max, endpoint["itemsPerPage"], startIndex, platforms, pageCallback) results = results + pageResults mainBoundsConstrainer.testOtherConstrainer(boundsConstrainer) pool = ThreadPool(processes=endpoint["fetchThreads"]) mpResults = [pool.apply_async(__doQuery, args=( endpoint, startTime, endTime, bbox, depth_min, depth_max, itemsPerPageR, x, platforms, pageCallback)) for x in range(len(pageResults), totalResults, itemsPerPageR)] pool.close() pool.join() ''' If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it. ''' if pageCallback is None: mpResults = [p.get() for p in mpResults] for mpResult in mpResults: results = results + mpResult[0] mainBoundsConstrainer.testOtherConstrainer(mpResult[4]) return results, mainBoundsConstrainer def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False): results, boundsConstrainer = fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) if placeholders: trimmedResults = [] for item in results: depth = None if "depth" in item: depth = item["depth"] if "sea_water_temperature_depth" in item: depth = item["sea_water_temperature_depth"] trimmedItem = { "x": item["x"], "y": item["y"], "source": item["source"], "time": item["time"], "device": item["device"] if "device" in item else None, "platform": item["platform"], "depth": depth } trimmedResults.append(trimmedItem) results = trimmedResults return results, boundsConstrainer