in source/workflowapi/app.py [0:0]
def create_stage(stage):
try:
stage_table = DYNAMO_RESOURCE.Table(STAGE_TABLE_NAME)
Configuration = {}
logger.info(stage)
validate(instance=stage, schema=SCHEMA["create_stage_request"])
logger.info("Stage schema is valid")
Name = stage["Name"]
# Check if this stage already exists
response = stage_table.get_item(
Key={
'Name': Name
},
ConsistentRead=True)
if "Item" in response:
raise ConflictError(
"A stage with the name '%s' already exists" % Name)
# Build the stage state machine. The stage machine consists of a parallel state with
# branches for each operator and a call to the stage completion lambda at the end.
# The parallel state takes a stage object as input. Each
# operator returns and operatorOutput object. The outputs for each operator are
# returned from the parallel state as elements of the "outputs" array.
stageAsl = {
"StartAt": "Preprocess Media",
"States": {
"Complete Stage {}".format(Name): {
"Type": "Task",
# FIXME - testing NoQ workflows
#"Resource": COMPLETE_STAGE_LAMBDA_ARN,
"Resource": COMPLETE_STAGE_LAMBDA_ARN,
"End": True
}
}
}
stageAsl["StartAt"] = Name
stageAsl["States"][Name] = {
"Type": "Parallel",
"Next": "Complete Stage {}".format(Name),
"ResultPath": "$.Outputs",
"Branches": [
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "Complete Stage {}".format(Name),
"ResultPath": "$.Outputs"
}
]
}
# Add a branch to the stage state machine for each operation, build up default
# Configuration for the stage based on the operator Configuration
for op in stage["Operations"]:
# lookup base workflow
operation = get_operation_by_name(op)
logger.info(json.dumps(operation, cls=DecimalEncoder))
stageAsl["States"][Name]["Branches"].append(
json.loads(operation["StateMachineAsl"]))
Configuration[op] = operation["Configuration"]
stageAslString = json.dumps(stageAsl)
stageAslString = stageAslString.replace("%%STAGE_NAME%%", stage["Name"])
stageAsl = json.loads(stageAslString)
logger.info(json.dumps(stageAsl))
stage["Configuration"] = Configuration
# Build stage
stage["Definition"] = json.dumps(stageAsl)
stage["Version"] = "v0"
stage["Id"] = str(uuid.uuid4())
stage["Created"] = str(datetime.now().timestamp())
stage["ResourceType"] = "STAGE"
stage["ApiVersion"] = API_VERSION
stage_table.put_item(Item=stage)
except ValidationError as e:
logger.error("got bad request error: {}".format(e))
raise BadRequestError(e)
except Exception as e:
logger.error("Exception {}".format(e))
stage = None
raise ChaliceViewError("Exception '%s'" % e)
return stage