def lambda_handler()

in source/control_plane/python/lambda/ttl_checker/ttl_checker.py [0:0]


def lambda_handler(event, context):
    """Handler called by AWS Lambda runtime

    Args:
      event(dict): a CloudWatch Event generated every minute
      context:

    Returns:

    """
    stats_obj = {'01_invocation_tstmp': {"label": "None", "tstmp": int(round(time.time() * 1000))}}
    event_counter = EventsCounter(
        ["counter_expired_tasks", "counter_failed_to_acquire",
         "counter_failed_tasks", "counter_released_tasks", "counter_inconsistent_state", "counter_tasks_queue_size"])

    for expired_tasks in state_table.query_expired_tasks():

        event_counter.increment("counter_expired_tasks", len(expired_tasks))
        event_counter.increment("counter_tasks_queue_size", queue.get_queue_length())

        for item in expired_tasks:
            print("Processing expired task: {}".format(item))
            task_id = item.get('task_id')
            owner_id = item.get('task_owner')
            current_heartbeat_timestamp = item.get('heartbeat_expiration_timestamp')
            try:
                is_acquired = state_table.acquire_task_for_ttl_lambda(
                    task_id, owner_id, current_heartbeat_timestamp)

                if not is_acquired:
                    # task has been updated at the very last second...
                    event_counter.increment("counter_failed_to_acquire")
                    continue

                # retreive current number of retries and task message handler
                retries, task_handler_id, task_priority = retreive_retries_and_task_handler_and_priority(task_id)
                print("Number of retires for task[{}]: {} Priority: {}".format(task_id, retries, task_priority))
                print("Last owner for task [{}]: {}".format(task_id, owner_id))

                # TODO: MAX_RETRIES should be extracted from task definition... Store in DDB?
                if retries == MAX_RETRIES:
                    print("Failing task {} after {} retries".format(task_id, retries))
                    event_counter.increment("counter_failed_tasks")
                    fail_task(task_id, task_handler_id, task_priority)
                    continue

                event_counter.increment("counter_released_tasks")
                # else
                state_table.retry_task(task_id, retries + 1)

                try:
                    # Task can be acquired by an agent from this point
                    reset_task_msg_vto(task_handler_id, task_priority)
                    print("SUCCESS FIX for {}".format(task_id))

                except ClientError:

                    try:
                        errlog.log('Failed to reset VTO trying to delete: {} '.format(task_id))
                        delete_message_from_queue(task_handler_id)
                    except ClientError:
                        errlog.log('Inconsistent task: {} sending do DLQ'.format(task_id))
                        event_counter.increment("counter_inconsistent_state")
                        set_task_inconsistent(task_id)
                        send_to_dlq(item)

            except ClientError as e:
                errlog.log('Lambda ttl error: {}'.format(e.response['Error']['Message']))
                print("Cannot process task {} : {}".format(task_id, e))
                print("Sending task {} to DLQ...".format(task_id))
                send_to_dlq(item)
            except Exception as e:
                print("Cannot process task {} : {}".format(task_id, e))
                print("Sending task {} to DLQ...".format(task_id))
                errlog.log('Lambda ttl error: {}'.format(e))
                send_to_dlq(item)

    stats_obj['02_completion_tstmp'] = {"label": "ttl_execution_time", "tstmp": int(round(time.time() * 1000))}
    perf_tracker.add_metric_sample(
        stats_obj,
        event_counter=event_counter,
        from_event="01_invocation_tstmp",
        to_event="02_completion_tstmp"
    )
    perf_tracker.submit_measurements()