in source/controlplaneapi/runtime/chalicelib/__init__.py [0:0]
def generate_profile_state_definition(profile_name, classifier, optimizer, labeler, featurers, plugin_definitions, shortened_profile, internal_lambda_arns):
print(f"Generating state machine definition for profile '{profile_name}'")
main_branch_list = []
classifier_labeler_optimizer_branch_list = []
is_labeler_present = False
is_labeler_dependent_present = False
# Check if Labeler (and its dependencies) is present in the profile
if labeler:
is_labeler_present = True
labeler_plugin_name = labeler["Name"]
if "DependentPlugins" in labeler:
is_labeler_dependent_present = True
# Classifier
classifier_plugin_name = classifier["Name"]
print(f"State machine generation for the Classifier plugin '{classifier_plugin_name}' in progress.")
classifier_plugin_definition = get_plugin_state_definition(classifier, "Classifier", plugin_definitions[classifier_plugin_name], shortened_profile)
# Remove the 'End' key and add the 'Next' key in the state definition
classifier_plugin_definition.pop("End", None)
classifier_plugin_definition["Next"] = "PluginOutputHandler"
multi_chunk_helper_task = {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": internal_lambda_arns["MultiChunkHelper"],
"Payload": {
"Event.$": "$.Event",
"Input.$": "$.Input",
"Profile": shortened_profile,
"MultiChunk": {
"PluginClass": "Classifier",
"WaitFactor": 5
}
}
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.Unknown",
"MREExecutionError"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "CheckMultiChunkStatus"
}
plugin_output_handler_task = {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": internal_lambda_arns["PluginOutputHandler"],
"Payload": {
"Event.$": "$.Event",
"Input.$": "$.Input",
"Profile": shortened_profile,
"Output.$": "$.Output"
}
},
"ResultPath": None,
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.Unknown",
"MREExecutionError"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "GenerateOriginalClips"
}
# Classifier and/or Labeler DependentPlugins
if "DependentPlugins" in classifier or is_labeler_dependent_present:
dependent_plugins = []
if "DependentPlugins" in classifier:
print(f"DependentPlugins State machine generation for the Classifier plugin '{classifier_plugin_name}' in progress.")
dependent_plugins += classifier["DependentPlugins"]
if is_labeler_dependent_present:
print(f"DependentPlugins State machine generation for the Labeler plugin '{labeler_plugin_name}' in progress.")
dependent_plugins += labeler["DependentPlugins"]
d_plugins_branch_list = get_plugin_state_definition_branch_list(dependent_plugins, None, plugin_definitions, True, shortened_profile)[0]
classifier_labeler_branch = {
"StartAt": "ClassifierLabelerDependentPluginsTask",
"States": {
"ClassifierLabelerDependentPluginsTask": {
"Type": "Parallel",
"Branches": d_plugins_branch_list,
"Next": "ClassifierLabelerCollectDependentPluginsResult"
},
"ClassifierLabelerCollectDependentPluginsResult": {
"Type": "Pass",
"Parameters": {
"Event.$": "$[0].Event",
"Input.$": "$[0].Input"
},
"Next": "MultiChunkHelper"
},
"MultiChunkHelper": multi_chunk_helper_task,
"CheckMultiChunkStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.MultiChunk.IsCompleted",
"BooleanEquals": False,
"Next": "Wait"
}
],
"Default": classifier_plugin_name
},
"Wait": {
"Type": "Wait",
"SecondsPath": "$.MultiChunk.WaitSeconds",
"Next": "MultiChunkHelper"
},
classifier_plugin_name: classifier_plugin_definition,
"PluginOutputHandler": plugin_output_handler_task,
"GenerateOriginalClips": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution",
"Parameters": {
"StateMachineArn": CLIP_GENERATION_STATE_MACHINE_ARN,
"Input": {
"GenerateOriginal": True,
"Event.$": "$.Event",
"Input.$": "$.Input",
"Segments.$": "$.Output.Results",
"Profile": shortened_profile
}
},
"ResultPath": None,
"End": True
}
}
}
else:
classifier_labeler_branch = {
"StartAt": "MultiChunkHelper",
"States": {
"MultiChunkHelper": multi_chunk_helper_task,
"CheckMultiChunkStatus": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.MultiChunk.IsCompleted",
"BooleanEquals": False,
"Next": "Wait"
}
],
"Default": classifier_plugin_name
},
"Wait": {
"Type": "Wait",
"SecondsPath": "$.MultiChunk.WaitSeconds",
"Next": "MultiChunkHelper"
},
classifier_plugin_name: classifier_plugin_definition,
"PluginOutputHandler": plugin_output_handler_task,
"GenerateOriginalClips": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution",
"Parameters": {
"StateMachineArn": CLIP_GENERATION_STATE_MACHINE_ARN,
"Input": {
"GenerateOriginal": True,
"Event.$": "$.Event",
"Input.$": "$.Input",
"Segments.$": "$.Output.Results",
"Profile": shortened_profile
}
},
"ResultPath": None,
"End": True
}
}
}
if is_labeler_present:
print(f"State machine generation for the Labeler plugin '{labeler_plugin_name}' in progress.")
labeler_plugin_definition = get_plugin_state_definition(labeler, "Labeler", plugin_definitions[labeler_plugin_name], shortened_profile)
# Remove the 'End' key and add the 'Next' key in the GenerateOriginalClips state definition
classifier_labeler_branch["States"]["GenerateOriginalClips"].pop("End", None)
classifier_labeler_branch["States"]["GenerateOriginalClips"]["Next"] = labeler_plugin_name
classifier_labeler_branch["States"][labeler_plugin_name] = labeler_plugin_definition
else:
print("Skipping state machine generation for Labeler as it is not included in the profile")
classifier_labeler_optimizer_branch_list.append(classifier_labeler_branch)
# Optimizer
if optimizer:
optimizer_plugin_name = optimizer["Name"]
print(f"State machine generation for the Optimizer plugin '{optimizer_plugin_name}' in progress.")
optimizer_plugin_definition = get_plugin_state_definition(optimizer, "Optimizer", plugin_definitions[optimizer_plugin_name], shortened_profile)
is_audio_media_type = True if plugin_definitions[optimizer_plugin_name]["SupportedMediaType"] == "Audio" else False
# Remove the 'End' key and add the 'Next' key in the state definition
optimizer_plugin_definition.pop("End", None)
optimizer_plugin_definition["Next"] = "GenerateOptimizedClips"
# Optimizer DependentPlugins
if "DependentPlugins" in optimizer:
print(f"DependentPlugins State machine generation for the Optimizer plugin '{optimizer_plugin_name}' in progress.")
d_plugins_branch_list, is_audio_media_type = get_plugin_state_definition_branch_list(optimizer["DependentPlugins"], None, plugin_definitions, False, shortened_profile)
optimizer_d_branch = {
"StartAt": "OptimizerDependentPluginsTask",
"States": {
"OptimizerDependentPluginsTask": {
"Type": "Parallel",
"Branches": d_plugins_branch_list,
"End": True
}
}
}
classifier_labeler_optimizer_branch_list.append(optimizer_d_branch)
if is_audio_media_type:
plugin_definition_parameters = optimizer_plugin_definition.pop("Parameters", None)
plugin_lambda_function = plugin_definition_parameters["FunctionName"]
optimizer_plugin_definition["Resource"] = plugin_lambda_function
optimizer_plugin_definition["ResultSelector"] = {
"PluginName.$": "$.Output.PluginName",
"PluginClass.$": "$.Output.PluginClass",
"ExecutionType.$": "$.Output.ExecutionType",
"DependentPlugins.$": "$.Output.DependentPlugins",
"ModelEndpoint.$": "$.Output.ModelEndpoint",
"Configuration.$": "$.Output.Configuration",
"Results.$": "$.Output.Results"
}
optimizer_plugin_definition["ResultPath"] = "$.Output"
optimizer_plugin_definition.pop("OutputPath", None)
map_parameters = plugin_definition_parameters["Payload"]
map_parameters["TrackNumber.$"] = "$$.Map.Item.Value"
optimize_segment_task = {
"Type": "Map",
"ItemsPath": "$.Event.AudioTracks",
"Parameters": map_parameters,
"Iterator": {
"StartAt": optimizer_plugin_name,
"States": {
optimizer_plugin_name: optimizer_plugin_definition,
"GenerateOptimizedClips": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution",
"Parameters": {
"StateMachineArn": CLIP_GENERATION_STATE_MACHINE_ARN,
"Input": {
"GenerateOriginal": False,
"Event.$": "$.Event",
"Input.$": "$.Input",
"Segments.$": "$.Output.Results",
"Profile": shortened_profile,
"TrackNumber.$": "$.TrackNumber"
}
},
"ResultPath": None,
"End": True
}
}
},
"End": True
}
classifier_labeler_optimizer_branch = {
"StartAt": "ClassifierLabelerOptimizerParallelTask",
"States": {
"ClassifierLabelerOptimizerParallelTask": {
"Type": "Parallel",
"Branches": classifier_labeler_optimizer_branch_list,
"Next": "CollectClassifierLabelerOptimizerParallelTaskResult"
},
"CollectClassifierLabelerOptimizerParallelTaskResult": {
"Type": "Pass",
"Parameters": {
"Event.$": "$[0].Event",
"Input.$": "$[0].Input"
},
"Next": "OptimizeSegmentTask"
},
"OptimizeSegmentTask": optimize_segment_task
}
}
else:
classifier_labeler_optimizer_branch = {
"StartAt": "ClassifierLabelerOptimizerParallelTask",
"States": {
"ClassifierLabelerOptimizerParallelTask": {
"Type": "Parallel",
"Branches": classifier_labeler_optimizer_branch_list,
"Next": "CollectClassifierLabelerOptimizerParallelTaskResult"
},
"CollectClassifierLabelerOptimizerParallelTaskResult": {
"Type": "Pass",
"Parameters": {
"Event.$": "$[0].Event",
"Input.$": "$[0].Input"
},
"Next": optimizer_plugin_name
},
optimizer_plugin_name: optimizer_plugin_definition,
"GenerateOptimizedClips": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution",
"Parameters": {
"StateMachineArn": CLIP_GENERATION_STATE_MACHINE_ARN,
"Input": {
"GenerateOriginal": False,
"Event.$": "$.Event",
"Input.$": "$.Input",
"Segments.$": "$.Output.Results",
"Profile": shortened_profile
}
},
"ResultPath": None,
"End": True
}
}
}
else:
print("Skipping the State machine generation for Optimizer as it is not included in the profile")
classifier_labeler_optimizer_branch = {
"StartAt": "ClassifierLabelerOptimizerParallelTask",
"States": {
"ClassifierLabelerOptimizerParallelTask": {
"Type": "Parallel",
"Branches": classifier_labeler_optimizer_branch_list,
"End": True
}
}
}
main_branch_list.append(classifier_labeler_optimizer_branch)
# Featurers
if featurers:
print("Featurers state machine generation in progress.")
featurers_branch_list, is_audio_media_type = get_plugin_state_definition_branch_list(featurers, "Featurer", plugin_definitions, False, shortened_profile)
featurers_branch = {
"StartAt": "FeaturersParallelTask",
"States": {
"FeaturersParallelTask": {
"Type": "Parallel",
"Branches": featurers_branch_list,
"End": True
}
}
}
main_branch_list.append(featurers_branch)
else:
print("Skipping state machine generation for Featurers as it is not included in the profile")
main_state_definition = {
"Comment": f"AWS MRE Processing Pipeline for profile {profile_name}",
"StartAt": "ProbeVideo",
"States": {
"ProbeVideo": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": internal_lambda_arns["ProbeVideo"],
"Payload": {
"Event.$": "$.Event",
"Input.$": "$.Input",
"Profile": shortened_profile
}
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.Unknown"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "WorkflowErrorHandler",
"ResultPath": "$.Error"
}
],
"Next": "MainParallelTask"
},
"MainParallelTask": {
"Type": "Parallel",
"Branches": main_branch_list,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "WorkflowErrorHandler",
"ResultPath": "$.Error"
}
],
"End": True
},
"WorkflowErrorHandler": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": internal_lambda_arns["WorkflowErrorHandler"],
"Payload": {
"Event.$": "$.Event",
"Input.$": "$.Input",
"Profile": shortened_profile,
"Error.$": "$.Error"
}
},
"OutputPath": "$.Payload",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.Unknown"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"End": True
}
}
}
return main_state_definition