def queue_handler()

in source/CRRMonitor/CRRMonitor.py [0:0]


def queue_handler(event, context):
    cnum = 0
    if 'child-number' in event:
        cnum = int(event['child-number'])

    message_floor = cnum * maxtask

    # {
    #   "Attributes": {"ApproximateNumberOfMessages": "1040"},
    #   "ResponseMetadata": {
    #       "RetryAttempts": 0,
    #       "HTTPStatusCode": 200,
    #       "RequestId": "51c43b7e-9b05-59c8-b68e-6a68f3f3b999",
    #       "HTTPHeaders": {
    #           "x-amzn-requestid": "51c43b7e-9b05-59c8-b68e-6a68f3f3b999",
    #           "content-length": "360",
    #           "server": "Server",
    #           "connection": "keep-alive",
    #           "date": "Thu, 09 Feb 2017 12:55:18 GMT",
    #           "content-type": "text/xml"
    #       }
    #   }
    # }
    response = client['sqs']['handle'].get_queue_attributes(
        QueueUrl=queue_endpoint,
        AttributeNames=['ApproximateNumberOfMessages']
        )

    if response['ResponseMetadata']['HTTPStatusCode'] != 200:
        print('Bad status from ' + queue + ': ' + response['ResponseMetadata']['HTTPStatusCode'])
        return

    queue_sz = int(response['Attributes']['ApproximateNumberOfMessages'])
    queue_backlog = queue_sz - message_floor

    print('INFO [CNUM-' + str(cnum) + '] Queue is ' + str(queue_sz) + \
        ' deep. Backlog is ' + str(queue_backlog))

    # We subtracted the number of messages for which processes are already
    # running. If the backlog is still too deep them first spawn another child,
    # updating child-number + 1
    if queue_backlog > maxtask:
        # increment child-number (or initialize to 1) in the event dict
        # spawn another lambda, passing the event and context dicts
        if cnum < maxspawn:
            event['child-number'] = cnum + 1
            try:
                client['lbd']['handle'].invoke(
                    FunctionName=context.function_name,
                    InvocationType='Event',
                    Payload=json.dumps(event)
                )
                print('Spawning a child because there are ' + str(queue_sz) + ' messages in the queue. I am child ' + str(cnum) + ' with a max capacity of ' + str(maxtask) + '. Message floor is ' + str(message_floor))

                print('Reproduction successful - child ' + str(cnum+1) + ' spawned')
            except Exception as e:
                print(e)
                print('ERROR[CNUM-' + str(cnum) + '] Failed to reproduce')
                raise e
        else:
            print('WARNING: maxspawn(' + str(maxspawn) + ') exceeded. Not spawning a helper.')

    # -----------------------------------------------------------------
    # Now we get to work. Process messages from the queue until empty
    # or we time out. This is the secret sauce to our horizontal scale
    print('INFO [CNUM-' + str(cnum) + '] Priming read from SQS...')
    msg_ctr = 0 # keep a count of messages processed
    sqs_msgs = client['sqs']['handle'].receive_message(
        QueueUrl=queue_endpoint,
        AttributeNames=['All'],
        MaxNumberOfMessages=10,
        VisibilityTimeout=60
    )
    sqs_delete = []
    while 'Messages' in sqs_msgs:
        print('INFO [CNUM-' + str(cnum) + '] Processing ' + str(len(sqs_msgs['Messages'])) + ' messages')
        for message in sqs_msgs['Messages']:
            rc = message_handler(json.loads(message['Body']))
            # If we did not get a 0 return code let the record time out back
            # back into the queue
            if not rc:
                sqs_delete.append({'Id': message['MessageId'], 'ReceiptHandle':  message['ReceiptHandle']})
                msg_ctr += 1 # keep a count of messages processed

        if len(sqs_delete) > 0:
            # Delete the messages we just processed
            response = client['sqs']['handle'].delete_message_batch(
                QueueUrl=queue_endpoint,
                Entries=sqs_delete
            )
            if len(response['Successful']) < len(sqs_delete):
                print('ERROR[CNUM-' + str(cnum) + ']: processed ' + str(len(sqs_msgs)) + ' messages but only deleted ' + str(len(response['Successful'])) + ' messages')

        sqs_delete = [] # reset the list

        print('INFO [CNUM-' + str(cnum) + '] Reading from SQS...')
        sqs_msgs = client['sqs']['handle'].receive_message(
            QueueUrl=queue_endpoint,
            AttributeNames=['All'],
            MaxNumberOfMessages=10,
            VisibilityTimeout=60
        )

    print('INFO [CNUM-' + str(cnum) + '] Completed - ' + str(msg_ctr) + ' messages processed')

    if SEND_ANONYMOUS_USAGE_METRIC and msg_ctr > 0:
        send_anonymous_usage_metric({
            "Action": f"Num messages processed by CRRMonitor: {msg_ctr}"
        })