analysis/webservice/webmodel/NexusExecutionResults.py (142 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from enum import Enum
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
class ExecutionStatus(Enum):
RUNNING = 'running'
SUCCESS = 'success'
FAILED = 'failed'
CANCELLED = 'cancelled'
def construct_job_status(job_state, created, updated, execution_id, params, host, message='',
filename=None):
filename_param = f'&filename={filename}' if filename else ''
return {
'status': job_state.value,
'message': message,
'createdAt': created,
'updatedAt': updated,
'links': [{
'href': f'{host}/job?id={execution_id}{filename_param}',
'title': 'Get job status - the current page',
'type': 'application/json',
'rel': 'self'
}],
'params': params,
'executionID': execution_id
}
def construct_done(status, created, completed, execution_id, params, host,
num_primary_matched, num_secondary_matched, num_unique_secondaries, filename):
job_body = construct_job_status(
status,
created,
completed,
execution_id,
params,
host,
filename=filename
)
# Add stats to body
job_body['totalPrimaryMatched'] = num_primary_matched
job_body['totalSecondaryMatched'] = num_secondary_matched
job_body['averageSecondaryMatched'] = round(num_secondary_matched/num_primary_matched) \
if num_primary_matched > 0 else 0
job_body['totalUniqueSecondaryMatched'] = num_unique_secondaries
filename_param = f'&filename={filename}' if filename else ''
# Construct urls
formats = [
('CSV', 'text/csv'),
('JSON', 'application/json'),
('NETCDF', 'binary/octet-stream')
]
job_body['links'].append({
'href': f'{host}/cdmscatalog/{execution_id}',
'title': 'STAC Catalog for execution results',
'type': 'application/json',
'rel': 'stac'
})
data_links = [{
'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}{filename_param}',
'title': f'Download {output_format} results',
'type': mime,
'rel': 'data'
} for output_format, mime in formats]
job_body['links'].extend(data_links)
return job_body
def construct_running(status, created, execution_id, params, host, filename):
job_body = construct_job_status(
status,
created,
None,
execution_id,
params,
host,
filename=filename
)
job_body['links'].append({
'href': f'{host}/job/cancel?id={execution_id}',
'title': 'Cancel the job',
'rel': 'cancel'
})
return job_body
def construct_error(status, created, completed, execution_id, message, params, host, filename):
return construct_job_status(
status,
created,
completed,
execution_id,
params,
host,
message,
filename=filename
)
def construct_cancelled(status, created, completed, execution_id, params, host, filename):
return construct_job_status(
status,
created,
completed,
execution_id,
params,
host,
filename=filename
)
class NexusExecutionResults:
def __init__(self, status=None, created=None, completed=None, execution_id=None, message='',
params=None, host=None, status_code=200, num_primary_matched=None,
num_secondary_matched=None, num_unique_secondaries=None, filename=None):
self.status_code = status_code
self.status = status
self.created = created
self.completed = completed
self.execution_id = execution_id
self.message = message
self.execution_params = params
self.host = host
self.num_primary_matched = num_primary_matched
self.num_secondary_matched = num_secondary_matched
self.num_unique_secondaries = num_unique_secondaries
self.filename = filename
def toJson(self):
params = {
'status': self.status,
'created': self.created,
'execution_id': self.execution_id,
'params': self.execution_params,
'host': self.host,
'filename': self.filename
}
if self.status == ExecutionStatus.SUCCESS:
params['completed'] = self.completed
params['num_primary_matched'] = self.num_primary_matched
params['num_secondary_matched'] = self.num_secondary_matched
params['num_unique_secondaries'] = self.num_unique_secondaries
construct = construct_done
elif self.status == ExecutionStatus.RUNNING:
construct = construct_running
elif self.status == ExecutionStatus.FAILED:
params['completed'] = self.completed
params['message'] = self.message
construct = construct_error
elif self.status == ExecutionStatus.CANCELLED:
params['completed'] = self.completed
construct = construct_cancelled
else:
# Raise error -- job state is invalid
raise ValueError('Unable to fetch status for execution {}', self.execution_id)
job_details = construct(**params)
return json.dumps(job_details, indent=4, default=str)