in source/control_plane/python/lambda/submit_tasks/submit_tasks.py [0:0]
def lambda_handler(event, context):
"""Handler called by AWS Lambda runtime
Args:
event (dict): an dictionary object containing the HTTP status code and the message to send back to the client):
an API Gateway generated event
context:
Returns:
dict: A message and a status code bind in dictionary object
"""
# If lambda are called through ALB - extracting actual event
if event.get('queryStringParameters') is not None:
all_params = event.get('queryStringParameters')
if task_input_passed_via_external_storage == '1':
session_id = all_params.get('submission_content')
encoded_json_tasks = stdin_iom.get_payload_to_utf8_string(session_id)
else:
encoded_json_tasks = all_params.get('submission_content')
if encoded_json_tasks is None:
raise Exception('Invalid submission format, expect submission_content parameter')
decoded_json_tasks = base64.urlsafe_b64decode(encoded_json_tasks).decode('utf-8')
event = json.loads(decoded_json_tasks)
else:
encoded_json_tasks = event['body']
decoded_json_tasks = base64.urlsafe_b64decode(encoded_json_tasks).decode('utf-8')
event = json.loads(decoded_json_tasks)
try:
invocation_tstmp = get_time_now_ms()
print(event)
# Session ID that will be used for all tasks in this event.
if event["session_id"] == "None":
# Generate new session id if no session is passed
# TODO: We are not currently supporting this option, consider for removal and replace with assertion
session_id = get_safe_session_id()
else:
session_id = event["session_id"]
# verify_passed_sessionid_is_unique(session_id)
session_priority = 0
if "context" in event:
session_priority = event["context"]["tasks_priority"]
parent_session_id = event["session_id"]
lambda_response = {
"session_id": session_id,
"task_ids": []
}
sqs_batch_entries = []
last_submitted_task_ref = None
tasks_list = event['tasks_list']['tasks']
ddb_batch_write_times = []
backoff_count = 0
state_table_entries = []
for task_id in tasks_list:
time_now_ms = get_time_now_ms()
task_definition = "none"
task_json = {
'session_id': session_id,
'task_id': task_id,
'parent_session_id': parent_session_id,
'submission_timestamp': time_now_ms,
'task_completion_timestamp': 0,
'task_status': state_table.make_task_state_from_session_id(TASK_STATE_PENDING, session_id),
'task_owner': "None",
'retries': 0,
'task_definition': task_definition,
'sqs_handler_id': "None",
'heartbeat_expiration_timestamp': 0,
"task_priority": session_priority
}
state_table_entries.append(task_json)
task_json_4_sqs: dict = copy.deepcopy(task_json)
task_json_4_sqs["stats"] = event["stats"]
task_json_4_sqs["stats"]["stage2_sbmtlmba_01_invocation_tstmp"]["tstmp"] = invocation_tstmp
task_json_4_sqs["stats"]["stage2_sbmtlmba_02_before_batch_write_tstmp"]["tstmp"] = get_time_now_ms()
# task_json["scheduler_data"] = event["scheduler_data"]
sqs_batch_entries.append({
'Id': task_id, # use to return send result for this message
'MessageBody': json.dumps(task_json_4_sqs)
}
)
last_submitted_task_ref = task_json_4_sqs
state_table.batch_write(state_table_entries)
# <2.> Batch submit tasks to SQS
# Performance critical code
sqs_max_batch_size = 10
sqs_batch_chunks = [sqs_batch_entries[x:x + sqs_max_batch_size] for x in
range(0, len(sqs_batch_entries), sqs_max_batch_size)]
for chunk in sqs_batch_chunks:
write_to_sqs(chunk, session_priority)
# <3.> Non performance critical code, statistics and book-keeping.
event_counter = EventsCounter(["count_submitted_tasks", "count_ddb_batch_backoffs", "count_ddb_batch_write_max",
"count_ddb_batch_write_min", "count_ddb_batch_write_avg"])
event_counter.increment("count_submitted_tasks", len(sqs_batch_entries))
last_submitted_task_ref['stats']['stage2_sbmtlmba_03_invocation_over_tstmp'] = {"label": "dynamo_db_submit_ms",
"tstmp": get_time_now_ms()}
event_counter.increment("count_ddb_batch_backoffs", backoff_count)
if len(ddb_batch_write_times) > 0:
event_counter.increment("count_ddb_batch_write_max", max(ddb_batch_write_times))
event_counter.increment("count_ddb_batch_write_min", min(ddb_batch_write_times))
event_counter.increment("count_ddb_batch_write_avg",
sum(ddb_batch_write_times) * 1.0 / len(ddb_batch_write_times))
print("BKF: [{}] LEN: {} LIST: {}".format(backoff_count, len(ddb_batch_write_times), ddb_batch_write_times))
perf_tracker.add_metric_sample(
last_submitted_task_ref['stats'],
event_counter=event_counter,
from_event="stage1_grid_api_01_task_creation_tstmp",
to_event="stage2_sbmtlmba_03_invocation_over_tstmp",
# event_time=(datetime.datetime.fromtimestamp(invocation_tstmp/1000.0)).isoformat()
)
perf_tracker.submit_measurements()
# <4.> Asswmble the response
for sqs_msg in sqs_batch_entries:
lambda_response["task_ids"].append(sqs_msg['Id'])
return {
'statusCode': 200,
'body': json.dumps(lambda_response)
}
except ClientError as e:
errlog.log("ClientError in Submit Tasks {} {}"
.format(e.response['Error']['Code'], traceback.format_exc()))
return {
'statusCode': 543,
'body': e.response['Error']['Message']
}
except Exception as e:
errlog.log("Exception in Submit Tasks {} [{}]"
.format(e, traceback.format_exc()))
return {
'statusCode': 543,
'body': "{}".format(e)
}