in source/client/python/api-v0.1/api/state_table_dynamodb.py [0:0]
def acquire_task_for_ttl_lambda(self, task_id, current_owner, current_heartbeat_timestamp):
"""
Args:
task_id: task to acquire
current_owner: expected current owner is None
current_heartbeat_timestamp: expected current timestamp
Returns:
Nothing
"""
try:
self.state_table.update_item(
Key={
'task_id': task_id
},
UpdateExpression="SET #var_task_owner = :val1, #var_task_status = :val2, #var_hb_timestamp = :val3",
ExpressionAttributeValues={
':val1': TTL_LAMBDA_ID,
':val2': self.__make_task_state_from_task_id(TASK_STATE_RETRYING, task_id),
':val3': 0
},
ExpressionAttributeNames={
"#var_task_owner": "task_owner",
"#var_task_status": "task_status",
"#var_hb_timestamp": "heartbeat_expiration_timestamp"
},
ConditionExpression=Attr('task_status').eq(self.__make_task_state_from_task_id(TASK_STATE_PROCESSING, task_id))
& Attr('task_owner').eq(current_owner)
& Attr('heartbeat_expiration_timestamp').eq(current_heartbeat_timestamp)
)
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
msg = f"{__name__} Failed ConditionalCheckFailedException.\
task_id [{task_id}] current_owner {current_owner} current_heartbeat {current_heartbeat_timestamp}\
task_status [{self.__make_task_state_from_task_id(TASK_STATE_PROCESSING, task_id)}] [{e.response['Error']}]"
logging.warning(msg)
raise StateTableException(e, msg, caused_by_condition=True)
elif e.response['Error']['Code'] in ["ThrottlingException", "ProvisionedThroughputExceededException"]:
msg = f"{__name__} Failed Throttling. task_id [{task_id}];\
task_status [{self.__make_task_state_from_task_id(TASK_STATE_PROCESSING, task_id)}] [{e.response['Error']}]"
logging.warning(msg)
raise StateTableException(e, msg, caused_by_throttling=True)
else:
msg = f"{__name__} Failed [{task_id}] Exception: [{e.response['Error']}]"
logging.error(msg)
raise Exception(e)
except Exception as e:
msg = f"{__name__} Failed.\
task_id [{task_id}] current_owner {current_owner} current_heartbeat {current_heartbeat_timestamp} {e}"
logging.error(msg)
raise e