analysis/webservice/management/Datasets.py (189 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from yaml import load
import json
from webservice.NexusHandler import nexus_handler
from nexustiles.nexustiles import NexusTileService
from webservice.webmodel import NexusRequestObject, NexusProcessingException
from schema import Schema, Or, SchemaError
from schema import Optional as Opt
from urllib.parse import urlparse
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
CONFIG_SCHEMA = Schema({
Or('variable', 'variables'): Or(str, [str]),
'coords': {
'latitude': str,
'longitude': str,
'time': str,
Opt('depth'): str
},
Opt('aws'): {
Opt('accessKeyID'): str,
Opt('secretAccessKey'): str,
'public': bool,
Opt('region'): str
}
})
class DatasetManagement:
@classmethod
def validate(cls):
pass
@staticmethod
def parse_config(request: NexusRequestObject):
content_type = request.get_headers()['Content-Type']
if content_type in ['application/json', 'application/x-json']:
config_dict = json.loads(request.get_request_body())
elif content_type == 'application/yaml':
config_dict = load(request.get_request_body(), Loader=Loader)
else:
raise NexusProcessingException(reason='Invalid Content-Type header', code=400)
try:
CONFIG_SCHEMA.validate(config_dict)
if 'aws' in config_dict:
if not config_dict['aws']['public']:
if 'accessKeyID' not in config_dict['aws'] or 'secretAccessKey' not in config_dict['aws']:
raise NexusProcessingException(
reason='Must provide AWS creds for non-public bucket',
code=400
)
except SchemaError as e:
raise NexusProcessingException(
reason=str(e),
code=400
)
return config_dict
class Response:
def __init__(self, response):
self.response = response if response is not None else {}
def toJson(self):
return json.dumps(self.response)
@nexus_handler
class DatasetAdd(DatasetManagement):
name = 'Add dataset'
path = '/datasets/add'
description = "Add new Zarr dataset to running SDAP instance"
params = {
"name": {
"name": "Dataset name",
"type": "string",
"description": "Name of new dataset to add"
},
"path": {
"name": "Path or URL",
"type": "string",
"description": "Path/URL of Zarr group"
},
"body": {
"name": "Request body",
"type": "application/json OR application/yaml",
"description": "POST request body. Config options for Zarr (variabe, coords, aws (if applicable))"
}
}
def __init__(self, **args):
pass
def calc(self, request: NexusRequestObject, **args):
try:
config = DatasetManagement.parse_config(request)
except Exception as e:
raise NexusProcessingException(
reason=repr(e),
code=400
)
name = request.get_argument('name')
if name is None:
raise NexusProcessingException(
reason='Name argument must be provided',
code=400
)
path = request.get_argument('path')
if path is None:
raise NexusProcessingException(
reason='Path argument must be provided',
code=400
)
try:
if urlparse(path).scheme not in ['file','','s3']:
raise NexusProcessingException(
reason='Dataset URL must be for a local file or S3 URL',
code=400
)
except ValueError:
raise NexusProcessingException(
reason='Could not parse path URL', code=400
)
try:
NexusTileService.user_ds_add(name, path, config)
except Exception as e:
raise NexusProcessingException(
reason=repr(e),
code=500
)
@nexus_handler
class DatasetUpdate(DatasetManagement):
name = 'Update dynamically added dataset'
path = '/datasets/update'
description = "Update Zarr dataset in running SDAP instance"
params = {
"name": {
"name": "Dataset name",
"type": "string",
"description": "Name of dataset to update"
},
"body": {
"name": "Request body",
"type": "application/json OR application/yaml",
"description": "POST request body. Config options for Zarr (variabe, coords, aws (if applicable))"
}
}
def __init__(self, **args):
pass
def calc(self, request: NexusRequestObject, **args):
try:
config = DatasetManagement.parse_config(request)
except Exception as e:
raise NexusProcessingException(
reason=repr(e),
code=400
)
name = request.get_argument('name')
if name is None:
raise NexusProcessingException(
reason='Name argument must be provided',
code=400
)
try:
return Response(NexusTileService.user_ds_update(name, config))
except Exception as e:
raise NexusProcessingException(
reason=repr(e),
code=500
)
@nexus_handler
class DatasetDelete(DatasetManagement):
name = 'Remove dataset'
path = '/datasets/remove'
description = "Remove Zarr dataset from running SDAP instance"
params = {
"name": {
"name": "Dataset name",
"type": "string",
"description": "Name of dataset to remove"
}
}
def __init__(self, **args):
pass
def calc(self, request: NexusRequestObject, **args):
name = request.get_argument('name')
if name is None:
raise NexusProcessingException(
reason='Name argument must be provided',
code=400
)
try:
return Response(NexusTileService.user_ds_delete(name))
except Exception as e:
raise NexusProcessingException(
reason=repr(e),
code=500
)