in source/workflowapi/app.py [0:0]
def create_operation(operation):
try:
operation_table = DYNAMO_RESOURCE.Table(OPERATION_TABLE_NAME)
logger.info(operation)
validate(instance=operation, schema=SCHEMA["create_operation_request"])
logger.info("Operation schema is valid")
Name = operation["Name"]
# FIXME - can jsonschema validate this?
if operation["Type"] == "Async":
checkRequiredInput("MonitorLambdaArn", operation, "Operation monitoring lambda function ARN")
elif operation["Type"] == "Sync":
pass
else:
raise BadRequestError('Operation Type must in ["Async"|"Sync"]')
# Check if this operation already exists
response = operation_table.get_item(
Key={
'Name': Name
},
ConsistentRead=True)
if "Item" in response:
raise ConflictError(
"A operation with the name '%s' already exists" % Name)
# Build the operation state machine.
if operation["Type"] == "Async":
operationAsl = ASYNC_OPERATION_ASL
elif operation["Type"] == "Sync":
operationAsl = SYNC_OPERATION_ASL
# Setup task parameters in step function. This filters out the paramters from
# the stage data structure that belong to this specific operation and passes the
# result as input to the task lmbda
# FIXME - remove this if hardcoded one works
# params = TASK_PARAMETERS_ASL
# params["OperationName"] = Name
# params["Configuration.$"] = "$.Configuration." + Name
# for k,v in operationAsl["States"].items():
# if v["Type"] == "Task":
# v["Parameters"] = params
operationAslString = json.dumps(operationAsl)
operationAslString = operationAslString.replace("%%OPERATION_NAME%%", operation["Name"])
operationAslString = operationAslString.replace("%%OPERATION_MEDIA_TYPE%%",
operation["Configuration"]["MediaType"])
if operation["Type"] == "Async":
operationAslString = operationAslString.replace("%%OPERATION_MONITOR_LAMBDA%%",
operation["MonitorLambdaArn"])
operationAslString = operationAslString.replace("%%OPERATION_START_LAMBDA%%", operation["StartLambdaArn"])
operationAslString = operationAslString.replace("%%OPERATION_FAILED_LAMBDA%%", OPERATOR_FAILED_LAMBDA_ARN)
operation["StateMachineAsl"] = operationAslString
logger.info(json.dumps(operation["StateMachineAsl"]))
operation["Version"] = "v0"
operation["Id"] = str(uuid.uuid4())
operation["Created"] = str(datetime.now().timestamp())
operation["ResourceType"] = "OPERATION"
operation["ApiVersion"] = API_VERSION
operation_table.put_item(Item=operation)
# build a singleton stage for this operation
try:
stage = {}
stage["Name"] = "_" + operation["Name"]
stage["Operations"] = []
stage["Operations"].append(operation["Name"])
# Build stage
response = create_stage(stage)
operation["StageName"] = stage["Name"]
operation_table.put_item(Item=operation)
except Exception as e:
logger.error("Error creating default stage for operation {}: {}".format(operation["Name"], e))
response = operation_table.delete_item(
Key={
'Name': operation["Name"]
}
)
raise
# TODO: Once IAM supports the ability to use tag-based policies for
# InvokeFunction, put that in the StepFunctionRole definition in
# media-insights-stack.yaml and remove the following code block. Inline
# policies have length limitations which will prevent users from adding
# more than about 35 new operators via the MIE workflow api. Tag based
# policies will not have any such limitation.
# Skip the inline policy creation for operators packaged with MIE.
# The inline policy is not needed for those operators because the
# StepFunctionRole has already been defined with permission to invoke
# those Lambdas (see media-insigts-stack.yaml).
if not ("OperatorLibrary" in operation["StartLambdaArn"] or "start-wait-operation" in operation["StartLambdaArn"]):
# If true then this is a new user-defined operator which needs to be added
# to the StepFunctionRole.
#
# Create an IAM policy to allow InvokeFunction on the StartLambdaArn
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": [
operation["StartLambdaArn"]
]
}
]
}
# Add the MonitorLambdaArn to that policy for async operators
if operation["Type"] == "Async":
policy['Statement'][0]['Resource'].append(operation["MonitorLambdaArn"])
# Attach that policy to the stage execution role as an inline policy
IAM_CLIENT.put_role_policy(
RoleName=STAGE_EXECUTION_ROLE.split('/')[1],
PolicyName=operation["Name"],
PolicyDocument=json.dumps(policy)
)
except ConflictError as e:
logger.error ("got ConflictError: {}".format (e))
raise
except ValidationError as e:
logger.error("got bad request error: {}".format(e))
raise BadRequestError(e)
except Exception as e:
logger.error("Exception {}".format(e))
operation = None
raise ChaliceViewError("Exception '%s'" % e)
logger.info("end create_operation: {}".format(json.dumps(operation, cls=DecimalEncoder)))
return operation