def workflow_scheduler_lambda()

in source/workflow/app.py [0:0]


def workflow_scheduler_lambda(event, context):

    execution_table = DYNAMO_CLIENT.Table(WORKFLOW_EXECUTION_TABLE_NAME)
    arn = ""
    workflow_execution = {}
    MaxConcurrentWorkflows = DEFAULT_MAX_CONCURRENT_WORKFLOWS
    empty = False

    try:
        logger.info(json.dumps(event))

        # Get the MaxConcurrent configruation parameter, if it is not set, use the default
        system_table = DYNAMO_CLIENT.Table(SYSTEM_TABLE_NAME)

        # Check if any configuration has been added yet
        response = system_table.get_item(
            Key={
                'Name': 'MaxConcurrentWorkflows'
            },
            ConsistentRead=True)

        if "Item" in response:
            MaxConcurrentWorkflows = int(response["Item"]["Value"])
            logger.info("Got MaxConcurrentWorkflows = {}".format(response["Item"]["Value"]))

        # Check if there are slots to run a workflow
        # FIXME - we really need consistent read here.  Index query is not consistent read
        started_workflows = list_workflow_executions_by_status(awsmie.WORKFLOW_STATUS_STARTED)
        num_started_workflows = len(started_workflows)

        if num_started_workflows >= MaxConcurrentWorkflows:
            logger.info("MaxConcurrentWorkflows has been reached {}/{} - nothing to do".format(num_started_workflows, MaxConcurrentWorkflows))

        else:
            # We can only read 10 messages at a time from the queue.  Loop reading from the queue until
            # it is empty or we are out of slots
            while (num_started_workflows < MaxConcurrentWorkflows and not empty):

                capacity = min(int(MaxConcurrentWorkflows - num_started_workflows), 10)

                logger.info("MaxConcurrentWorkflows has not been reached {}/{} - check if a workflow is available to run".format(num_started_workflows, MaxConcurrentWorkflows))

                # Check if there are workflows waiting to run on the STAGE_EXECUTION_QUEUE_URL
                messages = SQS_CLIENT.receive_message(
                    QueueUrl=STAGE_EXECUTION_QUEUE_URL,
                    MaxNumberOfMessages=capacity
                )
                if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key
                    for message in messages['Messages']: # 'Messages' is a list
                        # process the messages
                        logger.info(message['Body'])
                        # next, we delete the message from the queue so no one else will process it again,
                        # once it is in our hands it is going run or fail, no reprocessing
                        # FIXME - we may want to delay deleting the message until complete_stage is called on the
                        # final stage so we can detect hung workflows and time them out.  For now, do the simple thing.
                        SQS_CLIENT.delete_message(QueueUrl=STAGE_EXECUTION_QUEUE_URL,ReceiptHandle=message['ReceiptHandle'])

                        workflow_execution = json.loads(message['Body'])
                        queued_workflow_status = workflow_execution['Status']
                        workflow_execution['Status'] = awsmie.WORKFLOW_STATUS_STARTED

                        # Update the workflow status now to indicate we have taken it off the queue
                        update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_STARTED, "")

                        # Resumed workflows state machines are already executing since they just paused
                        # to wait for some external action
                        if queued_workflow_status != awsmie.WORKFLOW_STATUS_RESUMED:
                            # Kick off the state machine for the workflow
                            response = SFN_CLIENT.start_execution(
                                stateMachineArn=workflow_execution["Workflow"]["StateMachineArn"],
                                name=workflow_execution["Workflow"]["Name"]+workflow_execution["Id"],
                                input=json.dumps(workflow_execution["Workflow"]["Stages"][workflow_execution["CurrentStage"]])
                            )

                            workflow_execution["StateMachineExecutionArn"] = response["executionArn"]

                            # Update the workflow with the state machine id
                            response = execution_table.update_item(
                                Key={
                                    'Id': workflow_execution["Id"]
                                },
                                UpdateExpression='SET StateMachineExecutionArn = :arn',
                                ExpressionAttributeValues={
                                    ':arn': response["executionArn"]
                                }
                            )

                else:
                    logger.info('Queue is empty')
                    empty = True

                started_workflows = list_workflow_executions_by_status(awsmie.WORKFLOW_STATUS_STARTED)
                num_started_workflows = len(started_workflows)


    except Exception as e:

        logger.info("Exception in scheduler {}".format(e))
        if "Id" in workflow_execution:

            update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_ERROR, "Exception in workflow_scheduler_lambda {}".format(e))
        raise

    return arn