def lambda_handler()

in sdlf-utils/pipeline-examples/datalake-workload-management/wlm-standalone/lambda/workload-management/src/lambda_function.py [0:0]


def lambda_handler(event, context):
    try:
        high_message_queue = sqs_resource.get_queue_by_name(QueueName=HIGH,)
        low_message_queue = sqs_resource.get_queue_by_name(QueueName=LOW,)
        max_sfn_executions = 10
        
        #SFN processing code 
        #---------------------------------
        response = states_client.list_executions(
                stateMachineArn=STEPFUNCTION,
                statusFilter='RUNNING',
                maxResults=50,
            )
            
        executions = response["executions"]
        current_sfn_exeuction = len(executions)
        logger.info(f"current_sfn_exeuctions:{current_sfn_exeuction}")
        
        if(current_sfn_exeuction < max_sfn_executions):
            sfn_available_slots = max_sfn_executions - current_sfn_exeuction
            logger.info(f"sfn_available_slots:{sfn_available_slots}")
        else:
            logger.info("No step function slot empty ----- exiting")
            return
        #-----------------------------------
        #-----------------------------------
        
        messages = []
        #Dynamically manging workload from different priority queues
        #------------------------------------
        for message_queue in [high_message_queue, low_message_queue]:  #high=10 #low=30 sfn slots=40
            num_messages_queue = int(
            message_queue.attributes['ApproximateNumberOfMessages'])
            logger.info(f"Number of messages in {message_queue}:{num_messages_queue}")
            if(num_messages_queue == 0):
                logger.info(f"Not enough messages in {message_queue}, trying next prority queue")
            else:
                if(num_messages_queue >= sfn_available_slots): #Example  40>20 12>8 for high priority sqs
                    num_messages_queue = sfn_available_slots
                    
                messages = receive_max_messages(messages, message_queue, num_messages_queue)
                logger.info(f"messages:{messages}")
                sfn_available_slots = sfn_available_slots - num_messages_queue
                logger.info(f"sfn_available_slots:{sfn_available_slots}")
            if(sfn_available_slots==0):
                break
        #-------------------------------------
        #-------------------------------------
        
        
        #Running step function for processed messages
        #--------------------------------------
        if(len(messages)>0):
            for message in messages:
                states_client.start_execution(
                    stateMachineArn=STEPFUNCTION,
                    input=json.dumps(json.loads(message), default=json_serial))
            logger.info(f"{len(messages)} messages sent to step function ")
        else:
            logger.info(f"Not enough messages in any queue --- exiting")
        #----------------------------------------
            
            
    except Exception as e:
        raise e