def create_operation()

in source/workflowapi/app.py [0:0]


def create_operation(operation):

    try:
        operation_table = DYNAMO_RESOURCE.Table(OPERATION_TABLE_NAME)

        logger.info(operation)

        validate(instance=operation, schema=SCHEMA["create_operation_request"])
        logger.info("Operation schema is valid")

        Name = operation["Name"]

        # FIXME - can jsonschema validate this?
        if operation["Type"] == "Async":
            checkRequiredInput("MonitorLambdaArn", operation, "Operation monitoring lambda function ARN")
        elif operation["Type"] == "Sync":
            pass
        else:
            raise BadRequestError('Operation Type must in ["Async"|"Sync"]')

        # Check if this operation already exists
        response = operation_table.get_item(
            Key={
                'Name': Name
            },
            ConsistentRead=True)

        if "Item" in response:
            raise ConflictError(
                "A operation with the name '%s' already exists" % Name)

        # Build the operation state machine.

        if operation["Type"] == "Async":
            operationAsl = ASYNC_OPERATION_ASL
        elif operation["Type"] == "Sync":
            operationAsl = SYNC_OPERATION_ASL

        # Setup task parameters in step function.  This filters out the paramters from
        # the stage data structure that belong to this specific operation and passes the
        # result as input to the task lmbda
        # FIXME - remove this if hardcoded one works
        # params = TASK_PARAMETERS_ASL
        # params["OperationName"] = Name
        # params["Configuration.$"] = "$.Configuration." + Name
        # for k,v in operationAsl["States"].items():
        #     if v["Type"] == "Task":
        #         v["Parameters"] = params

        operationAslString = json.dumps(operationAsl)
        operationAslString = operationAslString.replace("%%OPERATION_NAME%%", operation["Name"])
        operationAslString = operationAslString.replace("%%OPERATION_MEDIA_TYPE%%",
                                                        operation["Configuration"]["MediaType"])

        if operation["Type"] == "Async":
            operationAslString = operationAslString.replace("%%OPERATION_MONITOR_LAMBDA%%",
                                                            operation["MonitorLambdaArn"])
        operationAslString = operationAslString.replace("%%OPERATION_START_LAMBDA%%", operation["StartLambdaArn"])
        operationAslString = operationAslString.replace("%%OPERATION_FAILED_LAMBDA%%", OPERATOR_FAILED_LAMBDA_ARN)
        operation["StateMachineAsl"] = operationAslString

        logger.info(json.dumps(operation["StateMachineAsl"]))

        operation["Version"] = "v0"
        operation["Id"] = str(uuid.uuid4())
        operation["Created"] = str(datetime.now().timestamp())
        operation["ResourceType"] = "OPERATION"
        operation["ApiVersion"] = API_VERSION

        operation_table.put_item(Item=operation)
        # build a singleton stage for this operation
        try:
            stage = {}
            stage["Name"] = "_" + operation["Name"]
            stage["Operations"] = []
            stage["Operations"].append(operation["Name"])

            # Build stage
            response = create_stage(stage)

            operation["StageName"] = stage["Name"]

            operation_table.put_item(Item=operation)


        except Exception as e:
            logger.error("Error creating default stage for operation {}: {}".format(operation["Name"], e))
            response = operation_table.delete_item(
                Key={
                    'Name': operation["Name"]
                }
            )
            raise

        # TODO: Once IAM supports the ability to use tag-based policies for
        #  InvokeFunction, put that in the StepFunctionRole definition in
        #  media-insights-stack.yaml and remove the following code block. Inline
        #  policies have length limitations which will prevent users from adding
        #  more than about 35 new operators via the MIE workflow api. Tag based
        #  policies will not have any such limitation.

        # Skip the inline policy creation for operators packaged with MIE.
        # The inline policy is not needed for those operators because the
        # StepFunctionRole has already been defined with permission to invoke
        # those Lambdas (see media-insigts-stack.yaml).
        if not ("OperatorLibrary" in operation["StartLambdaArn"] or "start-wait-operation" in operation["StartLambdaArn"]):
            # If true then this is a new user-defined operator which needs to be added
            # to the StepFunctionRole.
            #
            # Create an IAM policy to allow InvokeFunction on the StartLambdaArn
            policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Action": "lambda:InvokeFunction",
                        "Resource": [
                            operation["StartLambdaArn"]
                        ]
                    }
                ]
            }
            # Add the MonitorLambdaArn to that policy for async operators
            if operation["Type"] == "Async":
                policy['Statement'][0]['Resource'].append(operation["MonitorLambdaArn"])
            # Attach that policy to the stage execution role as an inline policy
            IAM_CLIENT.put_role_policy(
                RoleName=STAGE_EXECUTION_ROLE.split('/')[1],
                PolicyName=operation["Name"],
                PolicyDocument=json.dumps(policy)
            )

    except ConflictError as e:
        logger.error ("got ConflictError: {}".format (e))
        raise
    except ValidationError as e:
        logger.error("got bad request error: {}".format(e))
        raise BadRequestError(e)
    except Exception as e:
        logger.error("Exception {}".format(e))
        operation = None
        raise ChaliceViewError("Exception '%s'" % e)

    logger.info("end create_operation: {}".format(json.dumps(operation, cls=DecimalEncoder)))
    return operation