in source/core-api/lambda_functions/assign_queue_num.py [0:0]
def lambda_handler(event, _):
"""
This function is the entry handler for Lambda.
"""
print(event)
num_msg = len(event['Records'])
# this is done atomically
cur_count = rc.incr(QUEUE_COUNTER, num_msg)
print(cur_count)
q_start_num = cur_count - (num_msg-1)
# iterate over msgs
return_with_exception = False
for msg in event['Records']:
try:
body = json.loads(msg['body'])
request_id = msg['messageAttributes']['apig_request_id']['stringValue']
client_event_id = deep_clean(body['event_id'])
# if valid, assign number and del msg
# if the event ID is invalid, don't process it at all
if client_event_id == EVENT_ID:
# write item back to redis, use hset and use request_id as the key
# as item gets written, delete the message from queue right away
# use HSETNX so no effect on queue_number if already exists
entry_time = int(time.time())
rc.hsetnx(request_id, "entry_time", entry_time)
rc.hsetnx(request_id, "queue_number", q_start_num)
rc.hsetnx(request_id, "event_id", EVENT_ID)
rc.hsetnx(request_id, "status", 1)
result = rc.hgetall(request_id)
print(result)
# sqs has a vpc endpoint
response = sqs_client.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=msg["receiptHandle"]
)
print(response)
q_start_num+=1
except Exception as exception:
print(exception)
return_with_exception = True
if return_with_exception:
raise Exception("one or more messages failed processing")
# return the current count based on this batch process
return cur_count