integrations/python-client/nexuscli/nexuscli.py (176 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This module provides a native python client interface to the NEXUS (https://github.com/apache/incubator-sdap-nexus) webservice API. Usage: import nexuscli nexuscli.set_target("http://nexus-webapp:8083") nexuscli.dataset_list() """ import requests import numpy as np from datetime import datetime from collections import namedtuple, OrderedDict from pytz import UTC __pdoc__ = {} TimeSeries = namedtuple('TimeSeries', ('dataset', 'time', 'mean', 'standard_deviation', 'count', 'minimum', 'maximum')) TimeSeries.__doc__ = '''\ An object containing Time Series arrays. ''' __pdoc__['TimeSeries.dataset'] = "Name of the Dataset" __pdoc__['TimeSeries.time'] = "`numpy` array containing times as `datetime` objects" __pdoc__['TimeSeries.mean'] = "`numpy` array containing means" __pdoc__['TimeSeries.standard_deviation'] = "`numpy` array containing standard deviations" __pdoc__['TimeSeries.count'] = "`numpy` array containing counts" __pdoc__['TimeSeries.minimum'] = "`numpy` array containing minimums" __pdoc__['TimeSeries.maximum'] = "`numpy` array containing maximums" Point = namedtuple('Point', ('time', 'latitude', 'longitude', 'variable')) Point.__doc__ = '''\ An object containing Point attributes. ''' __pdoc__['Point.time'] = "time value as `datetime` object" __pdoc__['Point.latitude'] = "latitude value" __pdoc__['Point.longitude'] = "longitude value" __pdoc__['Point.variable'] = "dictionary of variable values" ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ" PYTHON32_ISO_FORMAT = "%Y-%m-%dT%H:%M:%S%z" target = 'http://localhost:8083' session = requests.session() def set_target(url, use_session=True): """ Set the URL for the NEXUS webapp endpoint. __url__ URL for NEXUS webapp endpoint __return__ None """ global target target = url print(("Target set to {}".format(target))) if not use_session: global session session = requests def dataset_list(): """ Get a list of datasets and the start and end time for each. __return__ list of datasets. Each entry in the list contains `shortname`, `start`, and `end` """ response = session.get("{}/list".format(target)) data = response.json() list_response = [] for dataset in data: dataset['start'] = dataset['iso_start'] dataset['end'] = dataset['iso_end'] ordered_dict = OrderedDict() ordered_dict['shortName'] = dataset['shortName'] ordered_dict['start'] = dataset['start'] ordered_dict['end'] = dataset['end'] list_response.append(ordered_dict) return list_response def daily_difference_average(dataset, bounding_box, start_datetime, end_datetime): """ Generate an anomaly Time series for a given dataset, bounding box, and timeframe. __dataset__ Name of the dataset as a String __bounding_box__ Bounding box for area of interest as a `shapely.geometry.polygon.Polygon` __start_datetime__ Start time as a `datetime.datetime` __end_datetime__ End time as a `datetime.datetime` __return__ List of `nexuscli.nexuscli.TimeSeries` namedtuples """ url = "{}/dailydifferenceaverage_spark?".format(target) params = { 'dataset': dataset, 'climatology': "{}_CLIM".format(dataset), 'b': ','.join(str(b) for b in bounding_box.bounds), 'startTime': start_datetime.strftime(ISO_FORMAT), 'endTime': end_datetime.strftime(ISO_FORMAT), } response = session.get(url, params=params) response.raise_for_status() response = response.json() data = np.array(response['data']).flatten() assert len(data) > 0, "No data found in {} between {} and {} for Datasets {}.".format(bounding_box.wkt, start_datetime.strftime( ISO_FORMAT), end_datetime.strftime( ISO_FORMAT), dataset) time_series_result = [] key_to_index = {k: x for x, k in enumerate(data[0].keys())} time_series_data = np.array([tuple(each.values()) for each in [entry for entry in data]]) if len(time_series_data) > 0: time_series_result.append( TimeSeries( dataset=dataset, time=np.array([datetime.utcfromtimestamp(t).replace(tzinfo=UTC) for t in time_series_data[:, key_to_index['time']]]), mean=time_series_data[:, key_to_index['mean']], standard_deviation=time_series_data[:, key_to_index['std']], count=None, minimum=None, maximum=None, ) ) return time_series_result def time_series(datasets, bounding_box, start_datetime, end_datetime, spark=True): """ Send a request to NEXUS to calculate a time series. __datasets__ Sequence (max length 2) of the name of the dataset(s) __bounding_box__ Bounding box for area of interest as a `shapely.geometry.polygon.Polygon` __start_datetime__ Start time as a `datetime.datetime` __end_datetime__ End time as a `datetime.datetime` __spark__ Optionally use spark. Default: `False` __return__ List of `nexuscli.nexuscli.TimeSeries` namedtuples """ if isinstance(datasets, str): datasets = [datasets] assert 0 < len(datasets) <= 2, "datasets must be a sequence of 1 or 2 items" params = { 'ds': ','.join(datasets), 'b': ','.join(str(b) for b in bounding_box.bounds), 'startTime': start_datetime.strftime(ISO_FORMAT), 'endTime': end_datetime.strftime(ISO_FORMAT), } if spark: url = "{}/timeSeriesSpark?".format(target) params['spark'] = "mesos,16,32" else: url = "{}/stats?".format(target) response = session.get(url, params=params) response.raise_for_status() response = response.json() data = np.array(response['data']).flatten() assert len(data) > 0, "No data found in {} between {} and {} for Datasets {}.".format(bounding_box.wkt, start_datetime.strftime( ISO_FORMAT), end_datetime.strftime( ISO_FORMAT), datasets) time_series_result = [] for i in range(0, len(response['meta'])): key_to_index = {k: x for x, k in enumerate(data[0].keys())} time_series_data = np.array([tuple(each.values()) for each in [entry for entry in data if entry['ds'] == i]]) if len(time_series_data) > 0: if 'iso_time' in key_to_index: time_series_result.append( TimeSeries( dataset=response['meta'][i]['shortName'], time=np.array([datetime.strptime(t, PYTHON32_ISO_FORMAT) for t in time_series_data[:, key_to_index['iso_time']]]), mean=np.array(time_series_data[:, key_to_index['mean']], dtype=float), standard_deviation=np.array(time_series_data[:, key_to_index['std']], dtype=float), count=np.array(time_series_data[:, key_to_index['cnt']], dtype=int), minimum=np.array(time_series_data[:, key_to_index['min']], dtype=float), maximum=np.array(time_series_data[:, key_to_index['max']], dtype=float), ) ) else: time_series_result.append( TimeSeries( dataset=response['meta'][i]['shortName'], time=np.array([datetime.utcfromtimestamp(int(t)).replace(tzinfo=UTC) for t in time_series_data[:, key_to_index['time']]]), mean=np.array(time_series_data[:, key_to_index['mean']], dtype=float), standard_deviation=np.array(time_series_data[:, key_to_index['std']], dtype=float), count=np.array(time_series_data[:, key_to_index['cnt']], dtype=int), minimum=np.array(time_series_data[:, key_to_index['min']], dtype=float), maximum=np.array(time_series_data[:, key_to_index['max']], dtype=float), ) ) return time_series_result def subset(dataset, bounding_box, start_datetime, end_datetime, parameter, metadata_filter): """ Fetches point values for a given dataset and geographical area or metadata criteria and time range. __dataset__ Name of the dataset as a String __bounding_box__ Bounding box for area of interest as a `shapely.geometry.polygon.Polygon` __start_datetime__ Start time as a `datetime.datetime` __end_datetime__ End time as a `datetime.datetime` __parameter__ The parameter of interest. One of 'sst', 'sss', 'wind' or None __metadata_filter__ List of key:value String metadata criteria __return__ List of `nexuscli.nexuscli.Point` namedtuples """ url = "{}/datainbounds?".format(target) params = { 'ds': dataset, 'startTime': start_datetime.strftime(ISO_FORMAT), 'endTime': end_datetime.strftime(ISO_FORMAT), 'parameter': parameter, } if bounding_box: params['b'] = ','.join(str(b) for b in bounding_box.bounds) else: if metadata_filter and len(metadata_filter) > 0: params['metadataFilter'] = metadata_filter response = session.get(url, params=params) response.raise_for_status() response = response.json() data = np.array(response['data']).flatten() assert len(data) > 0, "No data found in {} between {} and {} for Datasets {}.".format(bounding_box.wkt if bounding_box is not None else metadata_filter, start_datetime.strftime( ISO_FORMAT), end_datetime.strftime( ISO_FORMAT), dataset) subset_result = [] for d in data: subset_result.append( Point( time=datetime.utcfromtimestamp(d['time']).replace(tzinfo=UTC), longitude=d['longitude'], latitude=d['latitude'], variable=d['data'][0] ) ) return subset_result