def complete_stage_execution()

in source/workflow/app.py [0:0]


def complete_stage_execution(trigger, stage_name, status, outputs, workflow_execution_id):

    try:

        execution_table = DYNAMO_CLIENT.Table(WORKFLOW_EXECUTION_TABLE_NAME)
        # lookup the workflow
        response = execution_table.get_item(
            Key={
                'Id': workflow_execution_id
            },
            ConsistentRead=True)

        if "Item" in response:
            workflow_execution = response["Item"]
        else:
            workflow_execution = None
            # raise ChaliceViewError(
            raise ValueError(
                "Exception: workflow execution id '%s' not found" % workflow_execution_id)

        logger.info("workflow_execution: {}".format(
            json.dumps(workflow_execution)))


        # Roll-up the results of the stage execution.  If anything fails here, we will fail the
        # stage, but still attempt to update the workflow execution the stage belongs to
        try:
            # Roll up operation status
            # # if any operation did not complete successfully, the stage has failed
            opstatus = awsmie.STAGE_STATUS_COMPLETE
            errorMessage = "none"
            for operation in outputs:
                if operation["Status"] not in [awsmie.OPERATION_STATUS_COMPLETE, awsmie.OPERATION_STATUS_SKIPPED]:
                    opstatus = awsmie.STAGE_STATUS_ERROR
                    if "Message" in operation:
                        errorMessage = "Stage failed because operation {} execution failed. Message: {}".format(
                            operation["Name"], operation["Message"])
                    else:
                        errorMessage = "Stage failed because operation {} execution failed.".format(
                            operation["Name"])

            # don't overwrite an error
            if status != awsmie.STAGE_STATUS_ERROR:
                status = opstatus

            logger.info("Stage status: {}".format(status))

            workflow_execution["Workflow"]["Stages"][stage_name
                                                     ]["Outputs"] = outputs

            if "MetaData" not in workflow_execution["Globals"]:
                workflow_execution["Globals"]["MetaData"] = {}

            # Roll up operation media and metadata outputs from this stage and add them to
            # the global workflow metadata:
            #
            #     1. mediaType and metatdata output keys must be unique withina stage - if
            #        non-unique keys are found across operations within a stage, then the
            #        stage execution will fail.
            #     2. if a stage has a duplicates a mediaType or metadata output key from the globals,
            #        then the global value is replaced by the stage output value

            # Roll up media
            stageOutputMediaTypeKeys = []
            for operation in outputs:
                if "Media" in operation:
                    for mediaType in operation["Media"].keys():
                        # replace media with trasformed or created media from this stage
                        logger.info(mediaType)
                        if mediaType in stageOutputMediaTypeKeys:

                            raise ValueError(
                                "Duplicate mediaType '%s' found in operation ouput media.  mediaType keys must be unique within a stage." % mediaType)
                        else:
                            workflow_execution["Globals"]["Media"][mediaType] = operation["Media"][mediaType]
                            stageOutputMediaTypeKeys.append(mediaType)

                # Roll up metadata
                stageOutputMetadataKeys = []
                if "MetaData" in operation:
                    for key in operation["MetaData"].keys():
                        logger.info(key)
                        if key in stageOutputMetadataKeys:
                            raise ValueError(
                                "Duplicate key '%s' found in operation ouput metadata.  Metadata keys must be unique within a stage." % key)
                        else:
                            workflow_execution["Globals"]["MetaData"][key] = operation["MetaData"][key]
                            stageOutputMetadataKeys.append(key)

            workflow_execution["Workflow"]["Stages"][stage_name
                                                     ]["Status"] = status

        # The status roll up failed.  Handle the error and fall through to update the workflow status
        except Exception as e:

            logger.info("Exception while rolling up stage status {}".format(e))
            update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_ERROR, "Exception while rolling up stage status {}".format(e))
            status = awsmie.STAGE_STATUS_ERROR

            raise ValueError("Error rolling up stage status: %s" % e)

        logger.info("Updating the workflow status in dynamodb: current stage {}, globals {}".format(workflow_execution["Workflow"]["Stages"][stage_name],workflow_execution["Globals"]))
        # Save the new stage and workflow status
        response = execution_table.update_item(
            Key={
                'Id': workflow_execution_id
            },
            UpdateExpression='SET Workflow.Stages.#stage = :stage, Globals = :globals',
            ExpressionAttributeNames={
                '#stage': stage_name
            },
            ExpressionAttributeValues={
                # ':stage': json.dumps(stage)
                ':stage': workflow_execution["Workflow"]["Stages"][stage_name],
                ':globals': workflow_execution["Globals"]
                # ':step_function_arn': step_function_execution_arn
            }
        )

        # Start the next stage for execution
        # FIXME - try always completing stage
        # status == awsmie.STAGE_STATUS_COMPLETE:
        workflow_execution = start_next_stage_execution(
            "Workflow", stage_name, workflow_execution)

        if status == awsmie.STAGE_STATUS_ERROR:
            raise Exception("Stage {} encountered and error during execution, aborting the workflow".format(stage_name))

    except Exception as e:
        logger.info("Exception {}".format(e))

        # Need a try/catch here? Try to save the status
        execution_table.put_item(Item=workflow_execution)
        update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_ERROR, "Exception while rolling up stage status {}".format(e))

        logger.info("Exception {}".format(e))

        raise ValueError(
            "Exception: '%s'" % e)

    # If it is not the end of the workflow, pass the next stage out to be consumed by the next stage.
    if workflow_execution["CurrentStage"] == "End":
        return {}
    else:
        return workflow_execution["Workflow"]["Stages"][workflow_execution["CurrentStage"]]