def create_workflow_execution()

in source/workflowapi/app.py [0:0]


def create_workflow_execution(trigger, workflow_execution):
    execution_table = DYNAMO_RESOURCE.Table(WORKFLOW_EXECUTION_TABLE_NAME)
    dynamo_status_queued = False

    create_asset = None

    logger.info('create_workflow_execution workflow config: ' + str(workflow_execution))
    if "Input" in workflow_execution and "AssetId" in workflow_execution["Input"]:
        create_asset = False
    elif "Input" in workflow_execution and "Media" in workflow_execution["Input"]:
        create_asset = True
    else:
        raise BadRequestError('Input must contain either "AssetId" or "Media"')

    try:
        Name = workflow_execution["Name"]

        Configuration = workflow_execution["Configuration"] if "Configuration" in workflow_execution  else {}

        # BRANDON - make an asset
        dataplane = DataPlane()
        if create_asset is True:
            try:
                input = workflow_execution["Input"]["Media"]
                media_type = list(input.keys())[0]
                s3bucket = input[media_type]["S3Bucket"]
                s3key = input[media_type]["S3Key"]
            except KeyError as e:
                logger.error("Exception {}".format(e))
                raise ChaliceViewError("Exception '%s'" % e)
            else:
                asset_creation = dataplane.create_asset(media_type, s3bucket, s3key)
                # If create_asset fails, then asset_creation will contain the error
                # string instead of the expected dict. So, we'll raise that error
                # if we get a KeyError in the following try block:
                try:
                    asset_input = {
                        "Media": {
                            media_type: {
                                "S3Bucket": asset_creation["S3Bucket"],
                                "S3Key": asset_creation["S3Key"]
                            }
                        }
                    }
                    asset_id = asset_creation["AssetId"]
                except KeyError as e:
                    logger.error("Error creating asset {}".format(asset_creation))
                    raise ChaliceViewError("Error creating asset '%s'" % asset_creation)
        else:

            try:
                input = workflow_execution["Input"]["AssetId"]
            except KeyError as e:
                logger.error("Exception {}".format(e))
                raise ChaliceViewError("Exception '%s'" % e)
            else:
                asset_id = input

                workflow_execution_list = list_workflow_executions_by_assetid(asset_id)
                for workflow_execution in workflow_execution_list:
                    if workflow_execution["Status"] not in [awsmie.WORKFLOW_STATUS_COMPLETE, awsmie.WORKFLOW_STATUS_ERROR]:
                        raise ConflictError("There is currently another workflow execution(Id = {}) active on AssetId {}.".format(
                            workflow_execution["Id"], asset_id))

                retrieve_asset = dataplane.retrieve_asset_metadata(asset_id)
                if "results" in retrieve_asset:
                    s3bucket = retrieve_asset["results"]["S3Bucket"]
                    s3key = retrieve_asset["results"]["S3Key"]
                    media_type = retrieve_asset["results"]["MediaType"]
                    asset_input = {
                        "Media": {
                            media_type: {
                                "S3Bucket": s3bucket,
                                "S3Key": s3key
                            }
                        }
                    }
                else:
                    raise ChaliceViewError("Unable to retrieve asset: {e}".format(e=asset_id))

        workflow_execution = initialize_workflow_execution(trigger, Name, asset_input, Configuration, asset_id)

        execution_table.put_item(Item=workflow_execution)
        dynamo_status_queued = True

        # FIXME - must set workflow status to error if this fails since we marked it as QUeued .  we had to do that to avoid
        # race condition on status with the execution itself.  Once we hand it off to the state machine, we can't touch the status again.
        response = SQS_CLIENT.send_message(QueueUrl=STAGE_EXECUTION_QUEUE_URL, MessageBody=json.dumps(workflow_execution))
        # the response contains MD5 of the body, a message Id, MD5 of message attributes, and a sequence number (for FIFO queues)
        logger.info('Message ID : {}'.format(response['MessageId']))

        # Trigger the workflow_scheduler
        response = LAMBDA_CLIENT.invoke(
            FunctionName=WORKFLOW_SCHEDULER_LAMBDA_ARN,
            InvocationType='Event'
        )

    except Exception as e:
        logger.error("Exception {}".format(e))

        if dynamo_status_queued:
            update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_ERROR, "Exception {}".format(e))

        raise ChaliceViewError("Exception '%s'" % e)

    return workflow_execution