in source/client/python/api-v0.1/api/state_table_dynamodb.py [0:0]
def __finalize_tasks_state(self, task_id, new_task_state):
"""
This function called to move tasks into its final states.
This function does not check the conditions and simply overwrites old state with the new.
Args:
task_id: task to move into new final state
new_task_state: the new final state
Returns:
Nothing
Throws:
StateTableException on throttling
Exception for all other errors
"""
if new_task_state not in [TASK_STATE_FAILED, TASK_STATE_INCONSISTENT, TASK_STATE_CANCELLED]:
logging.error("__finalize_tasks_state called with incorrect input: {}".format(
new_task_state
))
try:
self.state_table.update_item(
Key={
'task_id': task_id
},
UpdateExpression="SET #var_task_owner = :val1, #var_task_status = :val2",
ExpressionAttributeValues={
':val1': 'None',
':val2': self.__make_task_state_from_task_id(new_task_state, task_id)
},
ExpressionAttributeNames={
"#var_task_owner": "task_owner",
"#var_task_status": "task_status"
}
)
except ClientError as e:
if e.response['Error']['Code'] in ["ThrottlingException", "ProvisionedThroughputExceededException"]:
msg = f"{__name__} Failed. Throttling. task_id [{task_id}] new state [{new_task_state}] {traceback.format_exc()}"
logging.warning(msg)
raise StateTableException(e, msg, caused_by_throttling=True)
else:
msg = f"{__name__} Failed. task_id [{task_id}] new state [{new_task_state}]\
Exception: [{e.response['Error']}] {traceback.format_exc()}"
logging.error(msg)
raise Exception(e)
except Exception as e:
msg = f"{__name__} Failed. task_id [{task_id}] new state [{new_task_state}]\
Exception: [{e}] {traceback.format_exc()}"
logging.error(msg)
raise e