in source/compute_plane/python/agent/agent.py [0:0]
def update_ttl_if_required(task, sqs_msg):
is_refresh_successful = True
# If this is the first time we are resetting ttl value or
# If the next time we will come to this point ttl ticket will expire
if ((ttl_gen.get_next_refresh_timestamp() == 0)
or (ttl_gen.get_next_refresh_timestamp() < time.time() + work_proc_status_pull_interval_sec)):
logging.info("***Updating TTL***")
# event_counter_post.increment("counter_update_ttl")
count = 0
while True:
count += 1
t1 = get_time_now_ms()
try:
# Note, if we will timeout on DDB update operation and we have to repeat this loop iteration,
# we will regenerate a new TTL ofset, which is what we want.
is_refresh_successful = state_table.refresh_ttl_for_ongoing_task(
task_id=task["task_id"],
agent_id=SELF_ID,
new_expirtaion_timestamp=ttl_gen.generate_next_ttl().get_next_expiration_timestamp()
)
except StateTableException as e:
if e.caused_by_throttling:
t2 = get_time_now_ms()
errlog.log(f"Agent TTL@StateTable Throttling for #{count} times for {t2 - t1} ms")
continue
elif e.caused_by_condition and is_task_has_been_cancelled(task["task_id"]):
# The only valid reason why we can be in this code path if the task has been cancelled by the client
# <1.> delete task from task queue so it wont be picked by other workers.
sqs_msg.delete()
# <2.> Terminate worker lambda function first
terminate_worker_lambda_container()
# <3.> Then terminate/restart the agent container;
logging.warning(f"Task {task['task_id']} has been cancelled during processing, restarting pod.")
os.kill(os.getpid(), signal.SIGKILL)
break
else:
# Unexpected error -> Fail
errlog.log(f"Unexpected StateTableException while refreshing TTL {e} [{traceback.format_exc()}]")
raise Exception(e)
except Exception as e:
errlog.log(f"Unexpected Exception while refreshing TTL {e} [{traceback.format_exc()}]")
raise e
return is_refresh_successful
else:
# Even if we didn't have to perform an update, return success.
return True