def lambda_handler()

in source/control_plane/python/lambda/submit_tasks/submit_tasks.py [0:0]


def lambda_handler(event, context):
    """Handler called by AWS Lambda runtime

    Args:
      event (dict): an dictionary object containing the HTTP status code and the message to send back to the client):
      an API Gateway generated event
      context:

    Returns:
        dict: A message and a status code bind in dictionary object


    """
    # If lambda are called through ALB - extracting actual event
    if event.get('queryStringParameters') is not None:
        all_params = event.get('queryStringParameters')
        if task_input_passed_via_external_storage == '1':
            session_id = all_params.get('submission_content')
            encoded_json_tasks = stdin_iom.get_payload_to_utf8_string(session_id)
        else:
            encoded_json_tasks = all_params.get('submission_content')
        if encoded_json_tasks is None:
            raise Exception('Invalid submission format, expect submission_content parameter')
        decoded_json_tasks = base64.urlsafe_b64decode(encoded_json_tasks).decode('utf-8')
        event = json.loads(decoded_json_tasks)
    else:
        encoded_json_tasks = event['body']
        decoded_json_tasks = base64.urlsafe_b64decode(encoded_json_tasks).decode('utf-8')
        event = json.loads(decoded_json_tasks)

    try:
        invocation_tstmp = get_time_now_ms()

        print(event)

        # Session ID that will be used for all tasks in this event.
        if event["session_id"] == "None":
            # Generate new session id if no session is passed
            # TODO: We are not currently supporting this option, consider for removal and replace with assertion
            session_id = get_safe_session_id()
        else:
            session_id = event["session_id"]
            # verify_passed_sessionid_is_unique(session_id)
        session_priority = 0
        if "context"  in event:
            session_priority = event["context"]["tasks_priority"]

        parent_session_id = event["session_id"]

        lambda_response = {
            "session_id": session_id,
            "task_ids": []
        }

        sqs_batch_entries = []
        last_submitted_task_ref = None

        tasks_list = event['tasks_list']['tasks']
        ddb_batch_write_times = []
        backoff_count = 0


        state_table_entries = []
        for task_id in tasks_list:
            time_now_ms = get_time_now_ms()
            task_definition = "none"

            task_json = {
                'session_id': session_id,
                'task_id': task_id,
                'parent_session_id': parent_session_id,
                'submission_timestamp': time_now_ms,
                'task_completion_timestamp': 0,
                'task_status': state_table.make_task_state_from_session_id(TASK_STATE_PENDING, session_id),
                'task_owner': "None",
                'retries': 0,
                'task_definition': task_definition,
                'sqs_handler_id': "None",
                'heartbeat_expiration_timestamp': 0,
                "task_priority": session_priority
            }

            state_table_entries.append(task_json)

            task_json_4_sqs: dict = copy.deepcopy(task_json)

            task_json_4_sqs["stats"] = event["stats"]
            task_json_4_sqs["stats"]["stage2_sbmtlmba_01_invocation_tstmp"]["tstmp"] = invocation_tstmp
            task_json_4_sqs["stats"]["stage2_sbmtlmba_02_before_batch_write_tstmp"]["tstmp"] = get_time_now_ms()

            # task_json["scheduler_data"] = event["scheduler_data"]

            sqs_batch_entries.append({
                'Id': task_id,  # use to return send result for this message
                'MessageBody': json.dumps(task_json_4_sqs)
                }
            )

            last_submitted_task_ref = task_json_4_sqs

        state_table.batch_write(state_table_entries)

        # <2.> Batch submit tasks to SQS
        # Performance critical code
        sqs_max_batch_size = 10
        sqs_batch_chunks = [sqs_batch_entries[x:x + sqs_max_batch_size] for x in
                            range(0, len(sqs_batch_entries), sqs_max_batch_size)]
        for chunk in sqs_batch_chunks:
            write_to_sqs(chunk, session_priority)

        # <3.> Non performance critical code, statistics and book-keeping.
        event_counter = EventsCounter(["count_submitted_tasks", "count_ddb_batch_backoffs", "count_ddb_batch_write_max",
                                       "count_ddb_batch_write_min", "count_ddb_batch_write_avg"])
        event_counter.increment("count_submitted_tasks", len(sqs_batch_entries))

        last_submitted_task_ref['stats']['stage2_sbmtlmba_03_invocation_over_tstmp'] = {"label": "dynamo_db_submit_ms",
                                                                                        "tstmp": get_time_now_ms()}

        event_counter.increment("count_ddb_batch_backoffs", backoff_count)

        if len(ddb_batch_write_times) > 0:
            event_counter.increment("count_ddb_batch_write_max", max(ddb_batch_write_times))
            event_counter.increment("count_ddb_batch_write_min", min(ddb_batch_write_times))
            event_counter.increment("count_ddb_batch_write_avg",
                                    sum(ddb_batch_write_times) * 1.0 / len(ddb_batch_write_times))

        print("BKF: [{}] LEN: {} LIST: {}".format(backoff_count, len(ddb_batch_write_times), ddb_batch_write_times))

        perf_tracker.add_metric_sample(
            last_submitted_task_ref['stats'],
            event_counter=event_counter,
            from_event="stage1_grid_api_01_task_creation_tstmp",
            to_event="stage2_sbmtlmba_03_invocation_over_tstmp",
            # event_time=(datetime.datetime.fromtimestamp(invocation_tstmp/1000.0)).isoformat()
        )
        perf_tracker.submit_measurements()

        # <4.> Asswmble the response
        for sqs_msg in sqs_batch_entries:
            lambda_response["task_ids"].append(sqs_msg['Id'])

        return {
            'statusCode': 200,
            'body': json.dumps(lambda_response)
        }
    except ClientError as e:
        errlog.log("ClientError in Submit Tasks {} {}"
                   .format(e.response['Error']['Code'], traceback.format_exc()))

        return {
            'statusCode': 543,
            'body': e.response['Error']['Message']
        }

    except Exception as e:
        errlog.log("Exception in Submit Tasks {} [{}]"
                   .format(e, traceback.format_exc()))

        return {
            'statusCode': 543,
            'body': "{}".format(e)
        }