def profile_state_definition_helper()

in source/controlplaneapi/runtime/app.py [0:0]


def profile_state_definition_helper(name, profile):
    plugins_list = []
    d_plugins_list = []
    batch_get_keys = []

    internal_lambda_arns = {
        "ProbeVideo": PROBE_VIDEO_LAMBDA_ARN,
        "MultiChunkHelper": MULTI_CHUNK_HELPER_LAMBDA_ARN,
        "PluginOutputHandler": PLUGIN_OUTPUT_HANDLER_LAMBDA_ARN,
        "WorkflowErrorHandler": WORKFLOW_ERROR_HANDLER_LAMBDA_ARN
    }

    shortened_profile = {
        "Name": name,
        "ChunkSize": profile["ChunkSize"],
        "MaxSegmentLengthSeconds": profile["MaxSegmentLengthSeconds"],
        "ProcessingFrameRate": profile["ProcessingFrameRate"]
    }

    # Classifier
    classifier = profile["Classifier"]

    if "ModelEndpoint" in classifier and isinstance(classifier["ModelEndpoint"], dict):
        classifier["ModelEndpoint"] = get_model_endpoint_from_ddb(classifier["ModelEndpoint"])

    shortened_classifier = {
        "Name": classifier["Name"],
        "Configuration": classifier["Configuration"] if "Configuration" in classifier else {},
        "DependentPlugins": []
    }
    plugins_list.append(classifier["Name"])
    batch_get_keys.append({"Name": classifier["Name"], "Version": "v0"})

    if "DependentPlugins" in classifier:
        for index, d_plugin in enumerate(classifier["DependentPlugins"]):
            if "ModelEndpoint" in d_plugin and isinstance(d_plugin["ModelEndpoint"], dict):
                classifier["DependentPlugins"][index]["ModelEndpoint"] = get_model_endpoint_from_ddb(
                    d_plugin["ModelEndpoint"])

            shortened_classifier["DependentPlugins"].append(
                {
                    "Name": d_plugin["Name"],
                    "Configuration": d_plugin["Configuration"] if "Configuration" in d_plugin else {},
                }
            )

            if d_plugin["Name"] not in d_plugins_list:
                d_plugins_list.append(d_plugin["Name"])
                batch_get_keys.append({"Name": d_plugin["Name"], "Version": "v0"})

    shortened_profile["Classifier"] = shortened_classifier

    # Optimizer
    optimizer = {}
    if "Optimizer" in profile and profile["Optimizer"]:
        optimizer = profile["Optimizer"]

        if "ModelEndpoint" in optimizer and isinstance(optimizer["ModelEndpoint"], dict):
            optimizer["ModelEndpoint"] = get_model_endpoint_from_ddb(optimizer["ModelEndpoint"])

        shortened_optimizer = {
            "Name": optimizer["Name"],
            "Configuration": optimizer["Configuration"] if "Configuration" in optimizer else {},
            "DependentPlugins": []
        }
        plugins_list.append(optimizer["Name"])
        batch_get_keys.append({"Name": optimizer["Name"], "Version": "v0"})

        if "DependentPlugins" in optimizer:
            for index, d_plugin in enumerate(optimizer["DependentPlugins"]):
                if "ModelEndpoint" in d_plugin and isinstance(d_plugin["ModelEndpoint"], dict):
                    optimizer["DependentPlugins"][index]["ModelEndpoint"] = get_model_endpoint_from_ddb(
                        d_plugin["ModelEndpoint"])

                shortened_optimizer["DependentPlugins"].append(
                    {
                        "Name": d_plugin["Name"],
                        "Configuration": d_plugin["Configuration"] if "Configuration" in d_plugin else {},
                    }
                )

                if d_plugin["Name"] not in d_plugins_list:
                    d_plugins_list.append(d_plugin["Name"])
                    batch_get_keys.append({"Name": d_plugin["Name"], "Version": "v0"})

        shortened_profile["Optimizer"] = shortened_optimizer

    # Labeler
    labeler = {}
    if "Labeler" in profile and profile["Labeler"]:
        labeler = profile["Labeler"]

        if "ModelEndpoint" in labeler and isinstance(labeler["ModelEndpoint"], dict):
            labeler["ModelEndpoint"] = get_model_endpoint_from_ddb(labeler["ModelEndpoint"])

        shortened_labeler = {
            "Name": labeler["Name"],
            "Configuration": labeler["Configuration"] if "Configuration" in labeler else {},
            "DependentPlugins": []
        }
        plugins_list.append(labeler["Name"])
        batch_get_keys.append({"Name": labeler["Name"], "Version": "v0"})

        if "DependentPlugins" in labeler:
            for index, d_plugin in enumerate(labeler["DependentPlugins"]):
                if "ModelEndpoint" in d_plugin and isinstance(d_plugin["ModelEndpoint"], dict):
                    labeler["DependentPlugins"][index]["ModelEndpoint"] = get_model_endpoint_from_ddb(
                        d_plugin["ModelEndpoint"])

                shortened_labeler["DependentPlugins"].append(
                    {
                        "Name": d_plugin["Name"],
                        "Configuration": d_plugin["Configuration"] if "Configuration" in d_plugin else {},
                    }
                )

                if d_plugin["Name"] not in d_plugins_list:
                    d_plugins_list.append(d_plugin["Name"])
                    batch_get_keys.append({"Name": d_plugin["Name"], "Version": "v0"})

        shortened_profile["Labeler"] = shortened_labeler

    # Featurers
    featurers = []
    if "Featurers" in profile and profile["Featurers"]:
        shortened_profile["Featurers"] = []

        featurers = profile["Featurers"]

        for index, featurer in enumerate(featurers):
            if featurer["Name"] in plugins_list:
                raise ConflictError(
                    f"Unable to create profile '{name}': Provided list of Featurers contains duplicates")

            if "ModelEndpoint" in featurer and isinstance(featurer["ModelEndpoint"], dict):
                featurers[index]["ModelEndpoint"] = get_model_endpoint_from_ddb(featurer["ModelEndpoint"])

            shortened_featurer = {
                "Name": featurer["Name"],
                "Configuration": featurer["Configuration"] if "Configuration" in featurer else {},
                "DependentPlugins": []
            }
            plugins_list.append(featurer["Name"])

            if featurer["Name"] not in d_plugins_list:
                batch_get_keys.append({"Name": featurer["Name"], "Version": "v0"})

            if "DependentPlugins" in featurer:
                for d_index, d_plugin in enumerate(featurer["DependentPlugins"]):
                    if "ModelEndpoint" in d_plugin and isinstance(d_plugin["ModelEndpoint"], dict):
                        featurers[index]["DependentPlugins"][d_index]["ModelEndpoint"] = get_model_endpoint_from_ddb(
                            d_plugin["ModelEndpoint"])

                    shortened_featurer["DependentPlugins"].append(
                        {
                            "Name": d_plugin["Name"],
                            "Configuration": d_plugin["Configuration"] if "Configuration" in d_plugin else {},
                        }
                    )

                    if d_plugin["Name"] not in d_plugins_list:
                        d_plugins_list.append(d_plugin["Name"])

                        if d_plugin["Name"] not in plugins_list:
                            batch_get_keys.append({"Name": d_plugin["Name"], "Version": "v0"})

            shortened_profile["Featurers"].append(shortened_featurer)

    # Retrieve the state machine definition of all the plugins present in the request
    response = ddb_resource.batch_get_item(
        RequestItems={
            PLUGIN_TABLE_NAME: {
                "Keys": batch_get_keys,
                "ConsistentRead": True,
                "ProjectionExpression": "#Name, #Class, #ExecutionType, #SupportedMediaType, #StateDefinition, #DependentPlugins, #Enabled, #Latest",
                "ExpressionAttributeNames": {
                    "#Name": "Name",
                    "#Class": "Class",
                    "#ExecutionType": "ExecutionType",
                    "#SupportedMediaType": "SupportedMediaType",
                    "#StateDefinition": "StateDefinition",
                    "#DependentPlugins": "DependentPlugins",
                    "#Enabled": "Enabled",
                    "#Latest": "Latest"
                }
            }
        }
    )

    responses = response["Responses"][PLUGIN_TABLE_NAME]

    while "UnprocessedKeys" in responses:
        response = ddb_resource.batch_get_item(
            RequestItems=responses["UnprocessedKeys"]
        )

        responses.extend(response["Responses"][PLUGIN_TABLE_NAME])

    plugin_definitions = {}

    for item in responses:
        plugin_definitions[item["Name"]] = {
            "Class": item["Class"],
            "ExecutionType": item["ExecutionType"],
            "SupportedMediaType": item["SupportedMediaType"],
            "StateDefinition": item["StateDefinition"],
            "DependentPlugins": item["DependentPlugins"] if "DependentPlugins" in item else [],
            "Enabled": item["Enabled"],
            "Latest": f"v{item['Latest']}"
        }

    # Check if any of the plugins present in the request does not exist or is disabled in the system
    for plugin in plugins_list:
        if plugin not in plugin_definitions:
            raise NotFoundError(f"Unable to create profile '{name}': Plugin '{plugin}' not found in the system")

        elif not plugin_definitions[plugin]["Enabled"]:
            raise BadRequestError(f"Unable to create profile '{name}': Plugin '{plugin}' is disabled in the system")

        else:
            for d_plugin in plugin_definitions[plugin]["DependentPlugins"]:
                if d_plugin not in d_plugins_list:
                    raise BadRequestError(
                        f"Unable to create profile '{name}': Required Dependent plugin '{d_plugin}' for plugin '{plugin}' not present in the request")

                elif d_plugin not in plugin_definitions:
                    raise NotFoundError(
                        f"Unable to create profile '{name}': Dependent plugin '{d_plugin}' for plugin '{plugin}' not found in the system")

                elif not plugin_definitions[d_plugin]["Enabled"]:
                    raise BadRequestError(
                        f"Unable to create profile '{name}': Dependent plugin '{d_plugin}' for plugin '{plugin}' is disabled in the system")

    # Add SupportedMediaType to all the DependentPlugins in the shortened_profile
    for index, d_plugin in enumerate(shortened_profile["Classifier"]["DependentPlugins"]):
        shortened_profile["Classifier"]["DependentPlugins"][index]["SupportedMediaType"] = \
            plugin_definitions[d_plugin["Name"]]["SupportedMediaType"]

    if "Optimizer" in shortened_profile:
        for index, d_plugin in enumerate(shortened_profile["Optimizer"]["DependentPlugins"]):
            shortened_profile["Optimizer"]["DependentPlugins"][index]["SupportedMediaType"] = \
                plugin_definitions[d_plugin["Name"]]["SupportedMediaType"]

    if "Labeler" in shortened_profile:
        for index, d_plugin in enumerate(shortened_profile["Labeler"]["DependentPlugins"]):
            shortened_profile["Labeler"]["DependentPlugins"][index]["SupportedMediaType"] = \
                plugin_definitions[d_plugin["Name"]]["SupportedMediaType"]

    if "Featurers" in shortened_profile:
        for p_index in range(len(shortened_profile["Featurers"])):
            for c_index, d_plugin in enumerate(shortened_profile["Featurers"][p_index]["DependentPlugins"]):
                shortened_profile["Featurers"][p_index]["DependentPlugins"][c_index]["SupportedMediaType"] = \
                    plugin_definitions[d_plugin["Name"]]["SupportedMediaType"]

    return (json.dumps(
        generate_profile_state_definition(name, classifier, optimizer, labeler, featurers, plugin_definitions,
                                          shortened_profile, internal_lambda_arns)), plugin_definitions)