analysis/webservice/algorithms/doms/StacCatalog.py (126 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the 'License'); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import re import uuid from typing import List from webservice.NexusHandler import nexus_handler from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval from webservice.webmodel import NexusProcessingException from webservice.webmodel import NexusResults from . import BaseDomsHandler class StacResults(NexusResults): def __init__(self, contents): NexusResults.__init__(self) self.contents = contents def toJson(self): return json.dumps(self.contents, indent=4) @nexus_handler class StacCatalog(BaseDomsHandler.BaseDomsQueryCalcHandler): name = 'STAC Catalog Handler' path = '^/cdmscatalog/?.*$' description = '' params = {} singleton = True def __init__(self, tile_service_factory, config=None): BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory) self.config = config def construct_catalog(self, execution_id: str): return { 'stac_version': '1.0.0', 'type': 'Catalog', 'id': str(execution_id), 'description': 'STAC Catalog for CDMS output', 'links': [ { 'rel': 'collection', 'href': f'https://{self.host}/cdmscatalog/{execution_id}/{output_format}', 'title': f'Collection of pages for {execution_id} {output_format} output' } for output_format in ['CSV', 'JSON', 'NETCDF'] ] } def construct_collection(self, execution_id: str, output_format: str, num_primary_matched: int, page_size: int, start_time: str, end_time: str, bbox: List[float]): links = [ { 'rel': 'self', 'href': f'https://{self.host}/cdmscatalog/{execution_id}/{output_format}', 'title': 'The current page', 'type': 'application/json' }, { 'rel': 'root', 'href': f'https://{self.host}/cdmscatalog/{execution_id}', 'title': f'Root catalog for {execution_id}', } ] url = f'https://{self.host}/cdmsresults?id={execution_id}&output={output_format}' for page_num, _ in enumerate(range(1, num_primary_matched, page_size), start=1): links.append({ 'rel': 'data', 'href': f'{url}&pageNum={page_num}&pageSize={page_size}' }) return { 'stac_version': '1.0.0', 'type': 'Collection', 'license': 'not-provided', 'id': f'{execution_id}.{output_format}', 'description': 'Collection of results for CDMS execution and result format', 'extent': { 'spatial': { 'bbox': bbox }, 'temporal': { 'interval': [start_time, end_time] } }, 'links': links, } def calc(self, request, **args): page_size = request.get_int_arg('pageSize', default=1000) url_path_regex = '^\/cdmscatalog\/?(?P<id>[a-zA-Z0-9-]*)\/?(?P<format>[a-zA-Z0-9]*)' match = re.search(url_path_regex, request.requestHandler.request.path) execution_id = match.group('id') output_format = match.group('format') self.host = request.requestHandler.request.host if not execution_id: raise NexusProcessingException( reason=f'Execution ID path param must be provided.', code=400 ) if execution_id: try: execution_id = uuid.UUID(execution_id) except ValueError: raise NexusProcessingException( reason=f'"{execution_id}" is not a valid uuid', code=400 ) if output_format and output_format.upper() not in ['CSV', 'JSON', 'NETCDF']: raise NexusProcessingException( reason=f'"{output_format}" is not a valid format. Should be CSV, JSON, or NETCDF.', code=400 ) if execution_id and not output_format: # Route to STAC catalog for execution stac_output = self.construct_catalog(execution_id) elif execution_id and output_format: # Route to STAC collection for execution+format with ResultsRetrieval(self.config) as retrieval: try: execution_stats = retrieval.retrieveStats(execution_id) execution_params = retrieval.retrieveParams(execution_id) except NexusProcessingException: execution_stats = {} num_primary_matched = execution_stats.get('numPrimaryMatched', 0) start_time = execution_params['startTime'].isoformat() end_time = execution_params['endTime'].isoformat() bbox = list(map(float, execution_params['bbox'].split(','))) stac_output = self.construct_collection( execution_id, output_format, num_primary_matched, page_size, start_time, end_time, bbox ) else: raise NexusProcessingException( reason=f'Invalid path parameters were provided', code=400 ) return StacResults(stac_output)