in source/workflowapi/app.py [0:0]
def create_workflow_execution(trigger, workflow_execution):
execution_table = DYNAMO_RESOURCE.Table(WORKFLOW_EXECUTION_TABLE_NAME)
dynamo_status_queued = False
create_asset = None
logger.info('create_workflow_execution workflow config: ' + str(workflow_execution))
if "Input" in workflow_execution and "AssetId" in workflow_execution["Input"]:
create_asset = False
elif "Input" in workflow_execution and "Media" in workflow_execution["Input"]:
create_asset = True
else:
raise BadRequestError('Input must contain either "AssetId" or "Media"')
try:
Name = workflow_execution["Name"]
Configuration = workflow_execution["Configuration"] if "Configuration" in workflow_execution else {}
# BRANDON - make an asset
dataplane = DataPlane()
if create_asset is True:
try:
input = workflow_execution["Input"]["Media"]
media_type = list(input.keys())[0]
s3bucket = input[media_type]["S3Bucket"]
s3key = input[media_type]["S3Key"]
except KeyError as e:
logger.error("Exception {}".format(e))
raise ChaliceViewError("Exception '%s'" % e)
else:
asset_creation = dataplane.create_asset(media_type, s3bucket, s3key)
# If create_asset fails, then asset_creation will contain the error
# string instead of the expected dict. So, we'll raise that error
# if we get a KeyError in the following try block:
try:
asset_input = {
"Media": {
media_type: {
"S3Bucket": asset_creation["S3Bucket"],
"S3Key": asset_creation["S3Key"]
}
}
}
asset_id = asset_creation["AssetId"]
except KeyError as e:
logger.error("Error creating asset {}".format(asset_creation))
raise ChaliceViewError("Error creating asset '%s'" % asset_creation)
else:
try:
input = workflow_execution["Input"]["AssetId"]
except KeyError as e:
logger.error("Exception {}".format(e))
raise ChaliceViewError("Exception '%s'" % e)
else:
asset_id = input
workflow_execution_list = list_workflow_executions_by_assetid(asset_id)
for workflow_execution in workflow_execution_list:
if workflow_execution["Status"] not in [awsmie.WORKFLOW_STATUS_COMPLETE, awsmie.WORKFLOW_STATUS_ERROR]:
raise ConflictError("There is currently another workflow execution(Id = {}) active on AssetId {}.".format(
workflow_execution["Id"], asset_id))
retrieve_asset = dataplane.retrieve_asset_metadata(asset_id)
if "results" in retrieve_asset:
s3bucket = retrieve_asset["results"]["S3Bucket"]
s3key = retrieve_asset["results"]["S3Key"]
media_type = retrieve_asset["results"]["MediaType"]
asset_input = {
"Media": {
media_type: {
"S3Bucket": s3bucket,
"S3Key": s3key
}
}
}
else:
raise ChaliceViewError("Unable to retrieve asset: {e}".format(e=asset_id))
workflow_execution = initialize_workflow_execution(trigger, Name, asset_input, Configuration, asset_id)
execution_table.put_item(Item=workflow_execution)
dynamo_status_queued = True
# FIXME - must set workflow status to error if this fails since we marked it as QUeued . we had to do that to avoid
# race condition on status with the execution itself. Once we hand it off to the state machine, we can't touch the status again.
response = SQS_CLIENT.send_message(QueueUrl=STAGE_EXECUTION_QUEUE_URL, MessageBody=json.dumps(workflow_execution))
# the response contains MD5 of the body, a message Id, MD5 of message attributes, and a sequence number (for FIFO queues)
logger.info('Message ID : {}'.format(response['MessageId']))
# Trigger the workflow_scheduler
response = LAMBDA_CLIENT.invoke(
FunctionName=WORKFLOW_SCHEDULER_LAMBDA_ARN,
InvocationType='Event'
)
except Exception as e:
logger.error("Exception {}".format(e))
if dynamo_status_queued:
update_workflow_execution_status(workflow_execution["Id"], awsmie.WORKFLOW_STATUS_ERROR, "Exception {}".format(e))
raise ChaliceViewError("Exception '%s'" % e)
return workflow_execution