in source/client/python/api-v0.1/api/state_table_dynamodb.py [0:0]
def update_task_status_to_finished(self, task_id, agent_id):
"""
Attempts to move task into finished state.
Args:
task_id: task to be updated
agent_id: expected worker id
Returns:
True if successful
Throws:
StateTableException on throttling
StateTableException on condition
Exception for all other errors
"""
session_id = self.__get_session_id_from_task_id(task_id)
update_succesfull = True
try:
self.state_table.update_item(
Key={
'task_id': task_id
},
UpdateExpression="SET #var_task_status = :val1",
ExpressionAttributeValues={
':val1': self.__make_task_state_from_session_id(TASK_STATE_FINISHED, session_id)
},
ExpressionAttributeNames={
"#var_task_status": "task_status"
},
ConditionExpression=Key('task_status').eq(
self.__make_task_state_from_session_id(TASK_STATE_PROCESSING, session_id)
) & Key('task_owner').eq(agent_id)
)
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
# For debugging purposes we re-read the row to later identify why exactly condition has failed.
check_read = self.get_task_by_id(task_id, consistent_read=True)
msg = f"Could not set completion state to Finish. ConditionalCheckFailedException\
on task: [{task_id}] owner [{agent_id}] for status [{self.__make_task_state_from_session_id(TASK_STATE_PENDING, session_id)}] from DynamoDB,\
someone else already locked it? [{e}]. Check State Table read: [{check_read}]"
logging.warning(msg)
raise StateTableException(e, msg, caused_by_condition=True)
elif e.response['Error']['Code'] in ["ThrottlingException", "ProvisionedThroughputExceededException"]:
msg = f"Could not set completion state to Finish on task: [{task_id}] from DynamoDB, Throttling Exception {e}"
logging.warning(msg)
raise StateTableException(e, msg, caused_by_throttling=True)
else:
msg = f"Could not set completion state to Finish on task: [{task_id}] from DynamoDB: {e}"
logging.error(msg)
raise Exception(e)
except Exception as e:
msg = f"Could not set completion state to Finish on task: [{task_id}] for agent [{agent_id}]: from DynamoDB: {e}"
logging.error(msg)
raise e
return update_succesfull