analysis/webservice/algorithms/doms/ExecutionCancel.py (53 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the 'License'); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import uuid from webservice.NexusHandler import nexus_handler from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval from webservice.webmodel import NexusExecutionResults from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler from datetime import datetime from webservice.algorithms.doms.ResultsStorage import ResultsStorage from webservice.webmodel.NexusExecutionResults import ExecutionStatus from webservice.webmodel import NexusProcessingException @nexus_handler class ExecutionStatusHandler(NexusCalcSparkTornadoHandler): name = 'Execution Status Handler' path = '/job/cancel' description = '' params = {} singleton = True def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None): NexusCalcSparkTornadoHandler.__init__( self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory ) self.tile_service_factory = tile_service_factory self.config = config def calc(self, request, tornado_io_loop, **args): execution_id = request.get_argument('id', None) try: execution_id = uuid.UUID(execution_id) except ValueError: raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400) with ResultsRetrieval(self.config) as retrieval: try: execution_details = retrieval.retrieveExecution(execution_id) except ValueError: raise NexusProcessingException( reason=f'Execution {execution_id} not found', code=404 ) job_status = NexusExecutionResults.ExecutionStatus(execution_details['status']) # Only proceed if status is "running". Otherwise, noop if job_status == ExecutionStatus.RUNNING: # Update job status to "cancelled" end = datetime.utcnow() with ResultsStorage(self.config) as storage: storage.updateExecution( execution_id, completeTime=end, status=ExecutionStatus.CANCELLED.value, message=None, stats=None, results=None ) # Cancel Spark job self._sc.cancelJobGroup(str(execution_id)) # Redirect to job status endpoint request.requestHandler.redirect(f'/job?id={execution_id}')