in source/controlplaneapi/runtime/app.py [0:0]
def profile_state_definition_helper(name, profile):
plugins_list = []
d_plugins_list = []
batch_get_keys = []
internal_lambda_arns = {
"ProbeVideo": PROBE_VIDEO_LAMBDA_ARN,
"MultiChunkHelper": MULTI_CHUNK_HELPER_LAMBDA_ARN,
"PluginOutputHandler": PLUGIN_OUTPUT_HANDLER_LAMBDA_ARN,
"WorkflowErrorHandler": WORKFLOW_ERROR_HANDLER_LAMBDA_ARN
}
shortened_profile = {
"Name": name,
"ChunkSize": profile["ChunkSize"],
"MaxSegmentLengthSeconds": profile["MaxSegmentLengthSeconds"],
"ProcessingFrameRate": profile["ProcessingFrameRate"]
}
# Classifier
classifier = profile["Classifier"]
if "ModelEndpoint" in classifier and isinstance(classifier["ModelEndpoint"], dict):
classifier["ModelEndpoint"] = get_model_endpoint_from_ddb(classifier["ModelEndpoint"])
shortened_classifier = {
"Name": classifier["Name"],
"Configuration": classifier["Configuration"] if "Configuration" in classifier else {},
"DependentPlugins": []
}
plugins_list.append(classifier["Name"])
batch_get_keys.append({"Name": classifier["Name"], "Version": "v0"})
if "DependentPlugins" in classifier:
for index, d_plugin in enumerate(classifier["DependentPlugins"]):
if "ModelEndpoint" in d_plugin and isinstance(d_plugin["ModelEndpoint"], dict):
classifier["DependentPlugins"][index]["ModelEndpoint"] = get_model_endpoint_from_ddb(
d_plugin["ModelEndpoint"])
shortened_classifier["DependentPlugins"].append(
{
"Name": d_plugin["Name"],
"Configuration": d_plugin["Configuration"] if "Configuration" in d_plugin else {},
}
)
if d_plugin["Name"] not in d_plugins_list:
d_plugins_list.append(d_plugin["Name"])
batch_get_keys.append({"Name": d_plugin["Name"], "Version": "v0"})
shortened_profile["Classifier"] = shortened_classifier
# Optimizer
optimizer = {}
if "Optimizer" in profile and profile["Optimizer"]:
optimizer = profile["Optimizer"]
if "ModelEndpoint" in optimizer and isinstance(optimizer["ModelEndpoint"], dict):
optimizer["ModelEndpoint"] = get_model_endpoint_from_ddb(optimizer["ModelEndpoint"])
shortened_optimizer = {
"Name": optimizer["Name"],
"Configuration": optimizer["Configuration"] if "Configuration" in optimizer else {},
"DependentPlugins": []
}
plugins_list.append(optimizer["Name"])
batch_get_keys.append({"Name": optimizer["Name"], "Version": "v0"})
if "DependentPlugins" in optimizer:
for index, d_plugin in enumerate(optimizer["DependentPlugins"]):
if "ModelEndpoint" in d_plugin and isinstance(d_plugin["ModelEndpoint"], dict):
optimizer["DependentPlugins"][index]["ModelEndpoint"] = get_model_endpoint_from_ddb(
d_plugin["ModelEndpoint"])
shortened_optimizer["DependentPlugins"].append(
{
"Name": d_plugin["Name"],
"Configuration": d_plugin["Configuration"] if "Configuration" in d_plugin else {},
}
)
if d_plugin["Name"] not in d_plugins_list:
d_plugins_list.append(d_plugin["Name"])
batch_get_keys.append({"Name": d_plugin["Name"], "Version": "v0"})
shortened_profile["Optimizer"] = shortened_optimizer
# Labeler
labeler = {}
if "Labeler" in profile and profile["Labeler"]:
labeler = profile["Labeler"]
if "ModelEndpoint" in labeler and isinstance(labeler["ModelEndpoint"], dict):
labeler["ModelEndpoint"] = get_model_endpoint_from_ddb(labeler["ModelEndpoint"])
shortened_labeler = {
"Name": labeler["Name"],
"Configuration": labeler["Configuration"] if "Configuration" in labeler else {},
"DependentPlugins": []
}
plugins_list.append(labeler["Name"])
batch_get_keys.append({"Name": labeler["Name"], "Version": "v0"})
if "DependentPlugins" in labeler:
for index, d_plugin in enumerate(labeler["DependentPlugins"]):
if "ModelEndpoint" in d_plugin and isinstance(d_plugin["ModelEndpoint"], dict):
labeler["DependentPlugins"][index]["ModelEndpoint"] = get_model_endpoint_from_ddb(
d_plugin["ModelEndpoint"])
shortened_labeler["DependentPlugins"].append(
{
"Name": d_plugin["Name"],
"Configuration": d_plugin["Configuration"] if "Configuration" in d_plugin else {},
}
)
if d_plugin["Name"] not in d_plugins_list:
d_plugins_list.append(d_plugin["Name"])
batch_get_keys.append({"Name": d_plugin["Name"], "Version": "v0"})
shortened_profile["Labeler"] = shortened_labeler
# Featurers
featurers = []
if "Featurers" in profile and profile["Featurers"]:
shortened_profile["Featurers"] = []
featurers = profile["Featurers"]
for index, featurer in enumerate(featurers):
if featurer["Name"] in plugins_list:
raise ConflictError(
f"Unable to create profile '{name}': Provided list of Featurers contains duplicates")
if "ModelEndpoint" in featurer and isinstance(featurer["ModelEndpoint"], dict):
featurers[index]["ModelEndpoint"] = get_model_endpoint_from_ddb(featurer["ModelEndpoint"])
shortened_featurer = {
"Name": featurer["Name"],
"Configuration": featurer["Configuration"] if "Configuration" in featurer else {},
"DependentPlugins": []
}
plugins_list.append(featurer["Name"])
if featurer["Name"] not in d_plugins_list:
batch_get_keys.append({"Name": featurer["Name"], "Version": "v0"})
if "DependentPlugins" in featurer:
for d_index, d_plugin in enumerate(featurer["DependentPlugins"]):
if "ModelEndpoint" in d_plugin and isinstance(d_plugin["ModelEndpoint"], dict):
featurers[index]["DependentPlugins"][d_index]["ModelEndpoint"] = get_model_endpoint_from_ddb(
d_plugin["ModelEndpoint"])
shortened_featurer["DependentPlugins"].append(
{
"Name": d_plugin["Name"],
"Configuration": d_plugin["Configuration"] if "Configuration" in d_plugin else {},
}
)
if d_plugin["Name"] not in d_plugins_list:
d_plugins_list.append(d_plugin["Name"])
if d_plugin["Name"] not in plugins_list:
batch_get_keys.append({"Name": d_plugin["Name"], "Version": "v0"})
shortened_profile["Featurers"].append(shortened_featurer)
# Retrieve the state machine definition of all the plugins present in the request
response = ddb_resource.batch_get_item(
RequestItems={
PLUGIN_TABLE_NAME: {
"Keys": batch_get_keys,
"ConsistentRead": True,
"ProjectionExpression": "#Name, #Class, #ExecutionType, #SupportedMediaType, #StateDefinition, #DependentPlugins, #Enabled, #Latest",
"ExpressionAttributeNames": {
"#Name": "Name",
"#Class": "Class",
"#ExecutionType": "ExecutionType",
"#SupportedMediaType": "SupportedMediaType",
"#StateDefinition": "StateDefinition",
"#DependentPlugins": "DependentPlugins",
"#Enabled": "Enabled",
"#Latest": "Latest"
}
}
}
)
responses = response["Responses"][PLUGIN_TABLE_NAME]
while "UnprocessedKeys" in responses:
response = ddb_resource.batch_get_item(
RequestItems=responses["UnprocessedKeys"]
)
responses.extend(response["Responses"][PLUGIN_TABLE_NAME])
plugin_definitions = {}
for item in responses:
plugin_definitions[item["Name"]] = {
"Class": item["Class"],
"ExecutionType": item["ExecutionType"],
"SupportedMediaType": item["SupportedMediaType"],
"StateDefinition": item["StateDefinition"],
"DependentPlugins": item["DependentPlugins"] if "DependentPlugins" in item else [],
"Enabled": item["Enabled"],
"Latest": f"v{item['Latest']}"
}
# Check if any of the plugins present in the request does not exist or is disabled in the system
for plugin in plugins_list:
if plugin not in plugin_definitions:
raise NotFoundError(f"Unable to create profile '{name}': Plugin '{plugin}' not found in the system")
elif not plugin_definitions[plugin]["Enabled"]:
raise BadRequestError(f"Unable to create profile '{name}': Plugin '{plugin}' is disabled in the system")
else:
for d_plugin in plugin_definitions[plugin]["DependentPlugins"]:
if d_plugin not in d_plugins_list:
raise BadRequestError(
f"Unable to create profile '{name}': Required Dependent plugin '{d_plugin}' for plugin '{plugin}' not present in the request")
elif d_plugin not in plugin_definitions:
raise NotFoundError(
f"Unable to create profile '{name}': Dependent plugin '{d_plugin}' for plugin '{plugin}' not found in the system")
elif not plugin_definitions[d_plugin]["Enabled"]:
raise BadRequestError(
f"Unable to create profile '{name}': Dependent plugin '{d_plugin}' for plugin '{plugin}' is disabled in the system")
# Add SupportedMediaType to all the DependentPlugins in the shortened_profile
for index, d_plugin in enumerate(shortened_profile["Classifier"]["DependentPlugins"]):
shortened_profile["Classifier"]["DependentPlugins"][index]["SupportedMediaType"] = \
plugin_definitions[d_plugin["Name"]]["SupportedMediaType"]
if "Optimizer" in shortened_profile:
for index, d_plugin in enumerate(shortened_profile["Optimizer"]["DependentPlugins"]):
shortened_profile["Optimizer"]["DependentPlugins"][index]["SupportedMediaType"] = \
plugin_definitions[d_plugin["Name"]]["SupportedMediaType"]
if "Labeler" in shortened_profile:
for index, d_plugin in enumerate(shortened_profile["Labeler"]["DependentPlugins"]):
shortened_profile["Labeler"]["DependentPlugins"][index]["SupportedMediaType"] = \
plugin_definitions[d_plugin["Name"]]["SupportedMediaType"]
if "Featurers" in shortened_profile:
for p_index in range(len(shortened_profile["Featurers"])):
for c_index, d_plugin in enumerate(shortened_profile["Featurers"][p_index]["DependentPlugins"]):
shortened_profile["Featurers"][p_index]["DependentPlugins"][c_index]["SupportedMediaType"] = \
plugin_definitions[d_plugin["Name"]]["SupportedMediaType"]
return (json.dumps(
generate_profile_state_definition(name, classifier, optimizer, labeler, featurers, plugin_definitions,
shortened_profile, internal_lambda_arns)), plugin_definitions)