in source/workflow/app.py [0:0]
def workflow_scheduler_lambda(event, context):
execution_table = DYNAMO_CLIENT.Table(WORKFLOW_EXECUTION_TABLE_NAME)
arn = ""
workflow_execution = {}
MaxConcurrentWorkflows = DEFAULT_MAX_CONCURRENT_WORKFLOWS
empty = False
try:
logger.info(json.dumps(event))
# Get the MaxConcurrent configruation parameter, if it is not set, use the default
system_table = DYNAMO_CLIENT.Table(SYSTEM_TABLE_NAME)
# Check if any configuration has been added yet
response = system_table.get_item(
Key={
'Name': 'MaxConcurrentWorkflows'
},
ConsistentRead=True)
if "Item" in response:
MaxConcurrentWorkflows = int(response["Item"]["Value"])
logger.info("Got MaxConcurrentWorkflows = {}".format(response["Item"]["Value"]))
# Check if there are slots to run a workflow
# FIXME - we really need consistent read here. Index query is not consistent read
started_workflows = list_workflow_executions_by_status(awsmie.WORKFLOW_STATUS_STARTED)
num_started_workflows = len(started_workflows)
if num_started_workflows >= MaxConcurrentWorkflows:
logger.info("MaxConcurrentWorkflows has been reached {}/{} - nothing to do".format(num_started_workflows, MaxConcurrentWorkflows))
else:
# We can only read 10 messages at a time from the queue. Loop reading from the queue until
# it is empty or we are out of slots
while (num_started_workflows < MaxConcurrentWorkflows and not empty):
capacity = min(int(MaxConcurrentWorkflows - num_started_workflows), 10)
logger.info("MaxConcurrentWorkflows has not been reached {}/{} - check if a workflow is available to run".format(num_started_workflows, MaxConcurrentWorkflows))
# Check if there are workflows waiting to run on the STAGE_EXECUTION_QUEUE_URL
messages = SQS_CLIENT.receive_message(
QueueUrl=STAGE_EXECUTION_QUEUE_URL,
MaxNumberOfMessages=capacity
)
if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key
for message in messages['Messages']: # 'Messages' is a list
# process the messages
logger.info(message['Body'])
# next, we delete the message from the queue so no one else will process it again,
# once it is in our hands it is going run or fail, no reprocessing
# FIXME - we may want to delay deleting the message until complete_stage is called on the
# final stage so we can detect hung workflows and time them out. For now, do the simple thing.
SQS_CLIENT.delete_message(QueueUrl=STAGE_EXECUTION_QUEUE_URL,ReceiptHandle=message['ReceiptHandle'])
workflow_execution = json.loads(message['Body'])
queued_workflow_status = workflow_execution['Status']
workflow_execution['Status'] = awsmie.WORKFLOW_STATUS_STARTED
# Update the workflow status now to indicate we have taken it off the queue
update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_STARTED, "")
# Resumed workflows state machines are already executing since they just paused
# to wait for some external action
if queued_workflow_status != awsmie.WORKFLOW_STATUS_RESUMED:
# Kick off the state machine for the workflow
response = SFN_CLIENT.start_execution(
stateMachineArn=workflow_execution["Workflow"]["StateMachineArn"],
name=workflow_execution["Workflow"]["Name"]+workflow_execution["Id"],
input=json.dumps(workflow_execution["Workflow"]["Stages"][workflow_execution["CurrentStage"]])
)
workflow_execution["StateMachineExecutionArn"] = response["executionArn"]
# Update the workflow with the state machine id
response = execution_table.update_item(
Key={
'Id': workflow_execution["Id"]
},
UpdateExpression='SET StateMachineExecutionArn = :arn',
ExpressionAttributeValues={
':arn': response["executionArn"]
}
)
else:
logger.info('Queue is empty')
empty = True
started_workflows = list_workflow_executions_by_status(awsmie.WORKFLOW_STATUS_STARTED)
num_started_workflows = len(started_workflows)
except Exception as e:
logger.info("Exception in scheduler {}".format(e))
if "Id" in workflow_execution:
update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_ERROR, "Exception in workflow_scheduler_lambda {}".format(e))
raise
return arn