in source/CRRMonitor/CRRMonitor.py [0:0]
def queue_handler(event, context):
cnum = 0
if 'child-number' in event:
cnum = int(event['child-number'])
message_floor = cnum * maxtask
# {
# "Attributes": {"ApproximateNumberOfMessages": "1040"},
# "ResponseMetadata": {
# "RetryAttempts": 0,
# "HTTPStatusCode": 200,
# "RequestId": "51c43b7e-9b05-59c8-b68e-6a68f3f3b999",
# "HTTPHeaders": {
# "x-amzn-requestid": "51c43b7e-9b05-59c8-b68e-6a68f3f3b999",
# "content-length": "360",
# "server": "Server",
# "connection": "keep-alive",
# "date": "Thu, 09 Feb 2017 12:55:18 GMT",
# "content-type": "text/xml"
# }
# }
# }
response = client['sqs']['handle'].get_queue_attributes(
QueueUrl=queue_endpoint,
AttributeNames=['ApproximateNumberOfMessages']
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
print('Bad status from ' + queue + ': ' + response['ResponseMetadata']['HTTPStatusCode'])
return
queue_sz = int(response['Attributes']['ApproximateNumberOfMessages'])
queue_backlog = queue_sz - message_floor
print('INFO [CNUM-' + str(cnum) + '] Queue is ' + str(queue_sz) + \
' deep. Backlog is ' + str(queue_backlog))
# We subtracted the number of messages for which processes are already
# running. If the backlog is still too deep them first spawn another child,
# updating child-number + 1
if queue_backlog > maxtask:
# increment child-number (or initialize to 1) in the event dict
# spawn another lambda, passing the event and context dicts
if cnum < maxspawn:
event['child-number'] = cnum + 1
try:
client['lbd']['handle'].invoke(
FunctionName=context.function_name,
InvocationType='Event',
Payload=json.dumps(event)
)
print('Spawning a child because there are ' + str(queue_sz) + ' messages in the queue. I am child ' + str(cnum) + ' with a max capacity of ' + str(maxtask) + '. Message floor is ' + str(message_floor))
print('Reproduction successful - child ' + str(cnum+1) + ' spawned')
except Exception as e:
print(e)
print('ERROR[CNUM-' + str(cnum) + '] Failed to reproduce')
raise e
else:
print('WARNING: maxspawn(' + str(maxspawn) + ') exceeded. Not spawning a helper.')
# -----------------------------------------------------------------
# Now we get to work. Process messages from the queue until empty
# or we time out. This is the secret sauce to our horizontal scale
print('INFO [CNUM-' + str(cnum) + '] Priming read from SQS...')
msg_ctr = 0 # keep a count of messages processed
sqs_msgs = client['sqs']['handle'].receive_message(
QueueUrl=queue_endpoint,
AttributeNames=['All'],
MaxNumberOfMessages=10,
VisibilityTimeout=60
)
sqs_delete = []
while 'Messages' in sqs_msgs:
print('INFO [CNUM-' + str(cnum) + '] Processing ' + str(len(sqs_msgs['Messages'])) + ' messages')
for message in sqs_msgs['Messages']:
rc = message_handler(json.loads(message['Body']))
# If we did not get a 0 return code let the record time out back
# back into the queue
if not rc:
sqs_delete.append({'Id': message['MessageId'], 'ReceiptHandle': message['ReceiptHandle']})
msg_ctr += 1 # keep a count of messages processed
if len(sqs_delete) > 0:
# Delete the messages we just processed
response = client['sqs']['handle'].delete_message_batch(
QueueUrl=queue_endpoint,
Entries=sqs_delete
)
if len(response['Successful']) < len(sqs_delete):
print('ERROR[CNUM-' + str(cnum) + ']: processed ' + str(len(sqs_msgs)) + ' messages but only deleted ' + str(len(response['Successful'])) + ' messages')
sqs_delete = [] # reset the list
print('INFO [CNUM-' + str(cnum) + '] Reading from SQS...')
sqs_msgs = client['sqs']['handle'].receive_message(
QueueUrl=queue_endpoint,
AttributeNames=['All'],
MaxNumberOfMessages=10,
VisibilityTimeout=60
)
print('INFO [CNUM-' + str(cnum) + '] Completed - ' + str(msg_ctr) + ' messages processed')
if SEND_ANONYMOUS_USAGE_METRIC and msg_ctr > 0:
send_anonymous_usage_metric({
"Action": f"Num messages processed by CRRMonitor: {msg_ctr}"
})