in source/control_plane/python/lambda/ttl_checker/ttl_checker.py [0:0]
def lambda_handler(event, context):
"""Handler called by AWS Lambda runtime
Args:
event(dict): a CloudWatch Event generated every minute
context:
Returns:
"""
stats_obj = {'01_invocation_tstmp': {"label": "None", "tstmp": int(round(time.time() * 1000))}}
event_counter = EventsCounter(
["counter_expired_tasks", "counter_failed_to_acquire",
"counter_failed_tasks", "counter_released_tasks", "counter_inconsistent_state", "counter_tasks_queue_size"])
for expired_tasks in state_table.query_expired_tasks():
event_counter.increment("counter_expired_tasks", len(expired_tasks))
event_counter.increment("counter_tasks_queue_size", queue.get_queue_length())
for item in expired_tasks:
print("Processing expired task: {}".format(item))
task_id = item.get('task_id')
owner_id = item.get('task_owner')
current_heartbeat_timestamp = item.get('heartbeat_expiration_timestamp')
try:
is_acquired = state_table.acquire_task_for_ttl_lambda(
task_id, owner_id, current_heartbeat_timestamp)
if not is_acquired:
# task has been updated at the very last second...
event_counter.increment("counter_failed_to_acquire")
continue
# retreive current number of retries and task message handler
retries, task_handler_id, task_priority = retreive_retries_and_task_handler_and_priority(task_id)
print("Number of retires for task[{}]: {} Priority: {}".format(task_id, retries, task_priority))
print("Last owner for task [{}]: {}".format(task_id, owner_id))
# TODO: MAX_RETRIES should be extracted from task definition... Store in DDB?
if retries == MAX_RETRIES:
print("Failing task {} after {} retries".format(task_id, retries))
event_counter.increment("counter_failed_tasks")
fail_task(task_id, task_handler_id, task_priority)
continue
event_counter.increment("counter_released_tasks")
# else
state_table.retry_task(task_id, retries + 1)
try:
# Task can be acquired by an agent from this point
reset_task_msg_vto(task_handler_id, task_priority)
print("SUCCESS FIX for {}".format(task_id))
except ClientError:
try:
errlog.log('Failed to reset VTO trying to delete: {} '.format(task_id))
delete_message_from_queue(task_handler_id)
except ClientError:
errlog.log('Inconsistent task: {} sending do DLQ'.format(task_id))
event_counter.increment("counter_inconsistent_state")
set_task_inconsistent(task_id)
send_to_dlq(item)
except ClientError as e:
errlog.log('Lambda ttl error: {}'.format(e.response['Error']['Message']))
print("Cannot process task {} : {}".format(task_id, e))
print("Sending task {} to DLQ...".format(task_id))
send_to_dlq(item)
except Exception as e:
print("Cannot process task {} : {}".format(task_id, e))
print("Sending task {} to DLQ...".format(task_id))
errlog.log('Lambda ttl error: {}'.format(e))
send_to_dlq(item)
stats_obj['02_completion_tstmp'] = {"label": "ttl_execution_time", "tstmp": int(round(time.time() * 1000))}
perf_tracker.add_metric_sample(
stats_obj,
event_counter=event_counter,
from_event="01_invocation_tstmp",
to_event="02_completion_tstmp"
)
perf_tracker.submit_measurements()