def generate_profile_state_definition()

in source/controlplaneapi/runtime/chalicelib/__init__.py [0:0]


def generate_profile_state_definition(profile_name, classifier, optimizer, labeler, featurers, plugin_definitions, shortened_profile, internal_lambda_arns):
    print(f"Generating state machine definition for profile '{profile_name}'")
    
    main_branch_list = []
    classifier_labeler_optimizer_branch_list = []

    is_labeler_present = False
    is_labeler_dependent_present = False

    # Check if Labeler (and its dependencies) is present in the profile
    if labeler:
        is_labeler_present = True
        labeler_plugin_name = labeler["Name"]

        if "DependentPlugins" in labeler:
            is_labeler_dependent_present = True

    # Classifier
    classifier_plugin_name = classifier["Name"]
    print(f"State machine generation for the Classifier plugin '{classifier_plugin_name}' in progress.")
    classifier_plugin_definition = get_plugin_state_definition(classifier, "Classifier", plugin_definitions[classifier_plugin_name], shortened_profile)
    
    # Remove the 'End' key and add the 'Next' key in the state definition
    classifier_plugin_definition.pop("End", None)
    classifier_plugin_definition["Next"] = "PluginOutputHandler"

    multi_chunk_helper_task = {
        "Type": "Task",
        "Resource": "arn:aws:states:::lambda:invoke",
        "Parameters": {
            "FunctionName": internal_lambda_arns["MultiChunkHelper"],
            "Payload": {
                "Event.$": "$.Event",
                "Input.$": "$.Input",
                "Profile": shortened_profile,
                "MultiChunk": {
                    "PluginClass": "Classifier",
                    "WaitFactor": 5
                }
            }
        },
        "OutputPath": "$.Payload",
        "Retry": [
            {
                "ErrorEquals": [
                    "Lambda.ServiceException",
                    "Lambda.AWSLambdaException",
                    "Lambda.SdkClientException",
                    "Lambda.Unknown",
                    "MREExecutionError"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
            }
        ],
        "Next": "CheckMultiChunkStatus"
    }

    plugin_output_handler_task = {
        "Type": "Task",
        "Resource": "arn:aws:states:::lambda:invoke",
        "Parameters": {
            "FunctionName": internal_lambda_arns["PluginOutputHandler"],
            "Payload": {
                "Event.$": "$.Event",
                "Input.$": "$.Input",
                "Profile": shortened_profile,
                "Output.$": "$.Output"
            }
        },
        "ResultPath": None,
        "Retry": [
            {
                "ErrorEquals": [
                    "Lambda.ServiceException",
                    "Lambda.AWSLambdaException",
                    "Lambda.SdkClientException",
                    "Lambda.Unknown",
                    "MREExecutionError"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
            }
        ],
        "Next": "GenerateOriginalClips"
    }

    # Classifier and/or Labeler DependentPlugins
    if "DependentPlugins" in classifier or is_labeler_dependent_present:
        dependent_plugins = []

        if "DependentPlugins" in classifier:
            print(f"DependentPlugins State machine generation for the Classifier plugin '{classifier_plugin_name}' in progress.")
            dependent_plugins += classifier["DependentPlugins"]

        if is_labeler_dependent_present:
            print(f"DependentPlugins State machine generation for the Labeler plugin '{labeler_plugin_name}' in progress.")
            dependent_plugins += labeler["DependentPlugins"]

        d_plugins_branch_list = get_plugin_state_definition_branch_list(dependent_plugins, None, plugin_definitions, True, shortened_profile)[0]

        classifier_labeler_branch = {
            "StartAt": "ClassifierLabelerDependentPluginsTask",
            "States": {
                "ClassifierLabelerDependentPluginsTask": {
                    "Type": "Parallel",
                    "Branches": d_plugins_branch_list,
                    "Next": "ClassifierLabelerCollectDependentPluginsResult"
                },
                "ClassifierLabelerCollectDependentPluginsResult": {
                    "Type": "Pass",
                    "Parameters": {
                        "Event.$": "$[0].Event",
                        "Input.$": "$[0].Input"
                    },
                    "Next": "MultiChunkHelper"
                },
                "MultiChunkHelper": multi_chunk_helper_task,
                "CheckMultiChunkStatus": {
                    "Type": "Choice",
                    "Choices": [
                        {
                            "Variable": "$.MultiChunk.IsCompleted",
                            "BooleanEquals": False,
                            "Next": "Wait"
                        }
                    ],
                    "Default": classifier_plugin_name
                },
                "Wait": {
                    "Type": "Wait",
                    "SecondsPath": "$.MultiChunk.WaitSeconds",
                    "Next": "MultiChunkHelper"
                },
                classifier_plugin_name: classifier_plugin_definition,
                "PluginOutputHandler": plugin_output_handler_task,
                "GenerateOriginalClips": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::states:startExecution",
                    "Parameters": {
                        "StateMachineArn": CLIP_GENERATION_STATE_MACHINE_ARN,
                        "Input": {
                            "GenerateOriginal": True,
                            "Event.$": "$.Event",
                            "Input.$": "$.Input",
                            "Segments.$": "$.Output.Results",
                            "Profile": shortened_profile
                        }
                    },
                    "ResultPath": None,
                    "End": True
                }
            }
        }

    else:
        classifier_labeler_branch = {
            "StartAt": "MultiChunkHelper",
            "States": {
                "MultiChunkHelper": multi_chunk_helper_task,
                "CheckMultiChunkStatus": {
                    "Type": "Choice",
                    "Choices": [
                        {
                            "Variable": "$.MultiChunk.IsCompleted",
                            "BooleanEquals": False,
                            "Next": "Wait"
                        }
                    ],
                    "Default": classifier_plugin_name
                },
                "Wait": {
                    "Type": "Wait",
                    "SecondsPath": "$.MultiChunk.WaitSeconds",
                    "Next": "MultiChunkHelper"
                },
                classifier_plugin_name: classifier_plugin_definition,
                "PluginOutputHandler": plugin_output_handler_task,
                "GenerateOriginalClips": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::states:startExecution",
                    "Parameters": {
                        "StateMachineArn": CLIP_GENERATION_STATE_MACHINE_ARN,
                        "Input": {
                            "GenerateOriginal": True,
                            "Event.$": "$.Event",
                            "Input.$": "$.Input",
                            "Segments.$": "$.Output.Results",
                            "Profile": shortened_profile
                        }
                    },
                    "ResultPath": None,
                    "End": True
                }
            }
        }

    if is_labeler_present:
        print(f"State machine generation for the Labeler plugin '{labeler_plugin_name}' in progress.")

        labeler_plugin_definition = get_plugin_state_definition(labeler, "Labeler", plugin_definitions[labeler_plugin_name], shortened_profile)
    
        # Remove the 'End' key and add the 'Next' key in the GenerateOriginalClips state definition
        classifier_labeler_branch["States"]["GenerateOriginalClips"].pop("End", None)
        classifier_labeler_branch["States"]["GenerateOriginalClips"]["Next"] = labeler_plugin_name

        classifier_labeler_branch["States"][labeler_plugin_name] = labeler_plugin_definition

    else:
        print("Skipping state machine generation for Labeler as it is not included in the profile")

    classifier_labeler_optimizer_branch_list.append(classifier_labeler_branch)

    # Optimizer
    if optimizer:
        optimizer_plugin_name = optimizer["Name"]
        print(f"State machine generation for the Optimizer plugin '{optimizer_plugin_name}' in progress.")
        optimizer_plugin_definition = get_plugin_state_definition(optimizer, "Optimizer", plugin_definitions[optimizer_plugin_name], shortened_profile)
        is_audio_media_type = True if plugin_definitions[optimizer_plugin_name]["SupportedMediaType"] == "Audio" else False

        # Remove the 'End' key and add the 'Next' key in the state definition
        optimizer_plugin_definition.pop("End", None)
        optimizer_plugin_definition["Next"] = "GenerateOptimizedClips"

        # Optimizer DependentPlugins
        if "DependentPlugins" in optimizer:
            print(f"DependentPlugins State machine generation for the Optimizer plugin '{optimizer_plugin_name}' in progress.")

            d_plugins_branch_list, is_audio_media_type = get_plugin_state_definition_branch_list(optimizer["DependentPlugins"], None, plugin_definitions, False, shortened_profile)

            optimizer_d_branch = {
                "StartAt": "OptimizerDependentPluginsTask",
                "States": {
                    "OptimizerDependentPluginsTask": {
                        "Type": "Parallel",
                        "Branches": d_plugins_branch_list,
                        "End": True
                    }
                }
            }

            classifier_labeler_optimizer_branch_list.append(optimizer_d_branch)

        if is_audio_media_type:
            plugin_definition_parameters = optimizer_plugin_definition.pop("Parameters", None)
            plugin_lambda_function = plugin_definition_parameters["FunctionName"]
            optimizer_plugin_definition["Resource"] = plugin_lambda_function
            optimizer_plugin_definition["ResultSelector"] = {
                "PluginName.$": "$.Output.PluginName",
                "PluginClass.$": "$.Output.PluginClass",
                "ExecutionType.$": "$.Output.ExecutionType",
                "DependentPlugins.$": "$.Output.DependentPlugins",
                "ModelEndpoint.$": "$.Output.ModelEndpoint",
                "Configuration.$": "$.Output.Configuration",
                "Results.$": "$.Output.Results"
            }
            optimizer_plugin_definition["ResultPath"] = "$.Output"
            optimizer_plugin_definition.pop("OutputPath", None)

            map_parameters = plugin_definition_parameters["Payload"]
            map_parameters["TrackNumber.$"] = "$$.Map.Item.Value"

            optimize_segment_task = {
                "Type": "Map",
                "ItemsPath": "$.Event.AudioTracks",
                "Parameters": map_parameters,
                "Iterator": {
                    "StartAt": optimizer_plugin_name,
                    "States": {
                        optimizer_plugin_name: optimizer_plugin_definition,
                        "GenerateOptimizedClips": {
                            "Type": "Task",
                            "Resource": "arn:aws:states:::states:startExecution",
                            "Parameters": {
                                "StateMachineArn": CLIP_GENERATION_STATE_MACHINE_ARN,
                                "Input": {
                                    "GenerateOriginal": False,
                                    "Event.$": "$.Event",
                                    "Input.$": "$.Input",
                                    "Segments.$": "$.Output.Results",
                                    "Profile": shortened_profile,
                                    "TrackNumber.$": "$.TrackNumber"
                                }
                            },
                            "ResultPath": None,
                            "End": True
                        }
                    }
                },
                "End": True
            }

            classifier_labeler_optimizer_branch = {
                "StartAt": "ClassifierLabelerOptimizerParallelTask",
                "States": {
                    "ClassifierLabelerOptimizerParallelTask": {
                        "Type": "Parallel",
                        "Branches": classifier_labeler_optimizer_branch_list,
                        "Next": "CollectClassifierLabelerOptimizerParallelTaskResult"
                    },
                    "CollectClassifierLabelerOptimizerParallelTaskResult": {
                        "Type": "Pass",
                        "Parameters": {
                            "Event.$": "$[0].Event",
                            "Input.$": "$[0].Input"
                        },
                        "Next": "OptimizeSegmentTask"
                    },
                    "OptimizeSegmentTask": optimize_segment_task
                }
            }
        
        else:
            classifier_labeler_optimizer_branch = {
                "StartAt": "ClassifierLabelerOptimizerParallelTask",
                "States": {
                    "ClassifierLabelerOptimizerParallelTask": {
                        "Type": "Parallel",
                        "Branches": classifier_labeler_optimizer_branch_list,
                        "Next": "CollectClassifierLabelerOptimizerParallelTaskResult"
                    },
                    "CollectClassifierLabelerOptimizerParallelTaskResult": {
                        "Type": "Pass",
                        "Parameters": {
                            "Event.$": "$[0].Event",
                            "Input.$": "$[0].Input"
                        },
                        "Next": optimizer_plugin_name
                    },
                    optimizer_plugin_name: optimizer_plugin_definition,
                    "GenerateOptimizedClips": {
                        "Type": "Task",
                        "Resource": "arn:aws:states:::states:startExecution",
                        "Parameters": {
                            "StateMachineArn": CLIP_GENERATION_STATE_MACHINE_ARN,
                            "Input": {
                                "GenerateOriginal": False,
                                "Event.$": "$.Event",
                                "Input.$": "$.Input",
                                "Segments.$": "$.Output.Results",
                                "Profile": shortened_profile
                            }
                        },
                        "ResultPath": None,
                        "End": True
                    }
                }
            }

    else:
        print("Skipping the State machine generation for Optimizer as it is not included in the profile")

        classifier_labeler_optimizer_branch = {
            "StartAt": "ClassifierLabelerOptimizerParallelTask",
            "States": {
                "ClassifierLabelerOptimizerParallelTask": {
                    "Type": "Parallel",
                    "Branches": classifier_labeler_optimizer_branch_list,
                    "End": True
                }
            }
        }

    main_branch_list.append(classifier_labeler_optimizer_branch)

    # Featurers
    if featurers:
        print("Featurers state machine generation in progress.")

        featurers_branch_list, is_audio_media_type = get_plugin_state_definition_branch_list(featurers, "Featurer", plugin_definitions, False, shortened_profile)

        featurers_branch = {
            "StartAt": "FeaturersParallelTask",
            "States": {
                "FeaturersParallelTask": {
                    "Type": "Parallel",
                    "Branches": featurers_branch_list,
                    "End": True
                }
            }
        }

        main_branch_list.append(featurers_branch)

    else:
        print("Skipping state machine generation for Featurers as it is not included in the profile")

    main_state_definition = {
        "Comment": f"AWS MRE Processing Pipeline for profile {profile_name}",
        "StartAt": "ProbeVideo",
        "States": {
            "ProbeVideo": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                    "FunctionName": internal_lambda_arns["ProbeVideo"],
                    "Payload": {
                        "Event.$": "$.Event",
                        "Input.$": "$.Input",
                        "Profile": shortened_profile
                    }
                },
                "OutputPath": "$.Payload",
                "Retry": [
                    {
                    "ErrorEquals": [
                        "Lambda.ServiceException",
                        "Lambda.AWSLambdaException",
                        "Lambda.SdkClientException",
                        "Lambda.Unknown"
                    ],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 6,
                    "BackoffRate": 2
                    }
                ],
                "Catch": [
                    {
                        "ErrorEquals": [
                            "States.ALL"
                        ],
                        "Next": "WorkflowErrorHandler",
                        "ResultPath": "$.Error"
                    }
                ],
                "Next": "MainParallelTask"
            },
            "MainParallelTask": {
                "Type": "Parallel",
                "Branches": main_branch_list,
                "Catch": [
                    {
                        "ErrorEquals": [
                            "States.ALL"
                        ],
                        "Next": "WorkflowErrorHandler",
                        "ResultPath": "$.Error"
                    }
                ],
                "End": True
            },
            "WorkflowErrorHandler": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                    "FunctionName": internal_lambda_arns["WorkflowErrorHandler"],
                    "Payload": {
                        "Event.$": "$.Event",
                        "Input.$": "$.Input",
                        "Profile": shortened_profile,
                        "Error.$": "$.Error"
                    }
                },
                "OutputPath": "$.Payload",
                "Retry": [
                    {
                    "ErrorEquals": [
                        "Lambda.ServiceException",
                        "Lambda.AWSLambdaException",
                        "Lambda.SdkClientException",
                        "Lambda.Unknown"
                    ],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 6,
                    "BackoffRate": 2
                    }
                ],
                "End": True
            }
        }
    }

    return main_state_definition