in source/workflow/app.py [0:0]
def complete_stage_execution(trigger, stage_name, status, outputs, workflow_execution_id):
try:
execution_table = DYNAMO_CLIENT.Table(WORKFLOW_EXECUTION_TABLE_NAME)
# lookup the workflow
response = execution_table.get_item(
Key={
'Id': workflow_execution_id
},
ConsistentRead=True)
if "Item" in response:
workflow_execution = response["Item"]
else:
workflow_execution = None
# raise ChaliceViewError(
raise ValueError(
"Exception: workflow execution id '%s' not found" % workflow_execution_id)
logger.info("workflow_execution: {}".format(
json.dumps(workflow_execution)))
# Roll-up the results of the stage execution. If anything fails here, we will fail the
# stage, but still attempt to update the workflow execution the stage belongs to
try:
# Roll up operation status
# # if any operation did not complete successfully, the stage has failed
opstatus = awsmie.STAGE_STATUS_COMPLETE
errorMessage = "none"
for operation in outputs:
if operation["Status"] not in [awsmie.OPERATION_STATUS_COMPLETE, awsmie.OPERATION_STATUS_SKIPPED]:
opstatus = awsmie.STAGE_STATUS_ERROR
if "Message" in operation:
errorMessage = "Stage failed because operation {} execution failed. Message: {}".format(
operation["Name"], operation["Message"])
else:
errorMessage = "Stage failed because operation {} execution failed.".format(
operation["Name"])
# don't overwrite an error
if status != awsmie.STAGE_STATUS_ERROR:
status = opstatus
logger.info("Stage status: {}".format(status))
workflow_execution["Workflow"]["Stages"][stage_name
]["Outputs"] = outputs
if "MetaData" not in workflow_execution["Globals"]:
workflow_execution["Globals"]["MetaData"] = {}
# Roll up operation media and metadata outputs from this stage and add them to
# the global workflow metadata:
#
# 1. mediaType and metatdata output keys must be unique withina stage - if
# non-unique keys are found across operations within a stage, then the
# stage execution will fail.
# 2. if a stage has a duplicates a mediaType or metadata output key from the globals,
# then the global value is replaced by the stage output value
# Roll up media
stageOutputMediaTypeKeys = []
for operation in outputs:
if "Media" in operation:
for mediaType in operation["Media"].keys():
# replace media with trasformed or created media from this stage
logger.info(mediaType)
if mediaType in stageOutputMediaTypeKeys:
raise ValueError(
"Duplicate mediaType '%s' found in operation ouput media. mediaType keys must be unique within a stage." % mediaType)
else:
workflow_execution["Globals"]["Media"][mediaType] = operation["Media"][mediaType]
stageOutputMediaTypeKeys.append(mediaType)
# Roll up metadata
stageOutputMetadataKeys = []
if "MetaData" in operation:
for key in operation["MetaData"].keys():
logger.info(key)
if key in stageOutputMetadataKeys:
raise ValueError(
"Duplicate key '%s' found in operation ouput metadata. Metadata keys must be unique within a stage." % key)
else:
workflow_execution["Globals"]["MetaData"][key] = operation["MetaData"][key]
stageOutputMetadataKeys.append(key)
workflow_execution["Workflow"]["Stages"][stage_name
]["Status"] = status
# The status roll up failed. Handle the error and fall through to update the workflow status
except Exception as e:
logger.info("Exception while rolling up stage status {}".format(e))
update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_ERROR, "Exception while rolling up stage status {}".format(e))
status = awsmie.STAGE_STATUS_ERROR
raise ValueError("Error rolling up stage status: %s" % e)
logger.info("Updating the workflow status in dynamodb: current stage {}, globals {}".format(workflow_execution["Workflow"]["Stages"][stage_name],workflow_execution["Globals"]))
# Save the new stage and workflow status
response = execution_table.update_item(
Key={
'Id': workflow_execution_id
},
UpdateExpression='SET Workflow.Stages.#stage = :stage, Globals = :globals',
ExpressionAttributeNames={
'#stage': stage_name
},
ExpressionAttributeValues={
# ':stage': json.dumps(stage)
':stage': workflow_execution["Workflow"]["Stages"][stage_name],
':globals': workflow_execution["Globals"]
# ':step_function_arn': step_function_execution_arn
}
)
# Start the next stage for execution
# FIXME - try always completing stage
# status == awsmie.STAGE_STATUS_COMPLETE:
workflow_execution = start_next_stage_execution(
"Workflow", stage_name, workflow_execution)
if status == awsmie.STAGE_STATUS_ERROR:
raise Exception("Stage {} encountered and error during execution, aborting the workflow".format(stage_name))
except Exception as e:
logger.info("Exception {}".format(e))
# Need a try/catch here? Try to save the status
execution_table.put_item(Item=workflow_execution)
update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_ERROR, "Exception while rolling up stage status {}".format(e))
logger.info("Exception {}".format(e))
raise ValueError(
"Exception: '%s'" % e)
# If it is not the end of the workflow, pass the next stage out to be consumed by the next stage.
if workflow_execution["CurrentStage"] == "End":
return {}
else:
return workflow_execution["Workflow"]["Stages"][workflow_execution["CurrentStage"]]