in source/controlplaneapi/runtime/app.py [0:0]
def register_plugin():
"""
Register a new plugin or publish a new version of an existing plugin with updated
attribute values.
Plugins can be one of the following types:
- Sync: Contains all the required processing logic within the plugin to achieve the end result
- SyncModel: Depends on a Machine Learning model to help with achieving the end result
Body:
.. code-block:: python
{
"Name": string,
"Description": string,
"Class": ["Classifier"|"Optimizer"|"Featurer"|"Labeler"]
"ExecutionType": ["Sync"|"SyncModel"],
"SupportedMediaType": ["Video"|"Audio"],
"ContentGroups": list,
"ExecuteLambdaQualifiedARN": arn,
"ModelEndpoints": [
{
"Name": string,
"Version": string
},
...
],
"Configuration" : {
"configuration1": "value1",
...
},
"OutputAttributes" : {
"attribute1": {
"Description": string
},
...
},
"DependentPlugins": list
}
Parameters:
- Name: Name of the Plugin
- Description: Description of the Plugin
- Class: One of "Classifier"|"Optimizer"|"Featurer"|"Labeler"
- ExecutionType: One of "Sync"|"SyncModel". SyncModel indicates that the Plugin has a ML Model dependency.
- SupportedMediaType: One of "Video"|"Audio". Whether Plugin operates on Video or Audio source
- ContentGroups: List of Content Group supported by the Plugin
- ExecuteLambdaQualifiedARN: ARN of the Lambda function that encapsulates the Plugin implementation
- ModelEndpoints: List of Dicts which contains the MRE Models used by the Plugin. Required only when the ExecutionType is SyncModel.
- Configuration: Configuration values which impact the Plugin behavior. For example, MlModelConfidenceScore: 60
- OutputAttributes: List of dict that have the name of the attributes the Plugin Outputs. These attributes can be configured to create Replays within MRE.
- DependentPlugins: A list of Plugin names on which this Plugin depends on. MRE executes the dependent plugins before executing this plugin.
Returns:
A dict containing the Id and Version of the registered plugin
.. code-block:: python
{
"Id": string,
"Version": string
}
Raises:
400 - BadRequestError
404 - NotFoundError
500 - ChaliceViewError
"""
try:
plugin = json.loads(app.current_request.raw_body.decode(), parse_float=Decimal)
validate(instance=plugin, schema=API_SCHEMA["register_plugin"])
print("Got a valid plugin schema")
name = plugin["Name"]
execution_type = plugin["ExecutionType"]
if execution_type == "SyncModel":
if "ModelEndpoints" not in plugin:
raise BadRequestError("Missing required key 'ModelEndpoints' in the input")
else:
model_table = ddb_resource.Table(MODEL_TABLE_NAME)
for model_endpoint in plugin["ModelEndpoints"]:
model_name = model_endpoint["Name"]
model_version = model_endpoint["Version"]
response = model_table.get_item(
Key={
"Name": model_name,
"Version": model_version
},
ConsistentRead=True
)
if "Item" not in response:
raise NotFoundError(f"Model endpoint '{model_name}' with version '{model_version}' not found")
elif not response["Item"]["Enabled"]:
raise BadRequestError(
f"Model endpoint '{model_name}' with version '{model_version}' is disabled in the system")
plugin_table = ddb_resource.Table(PLUGIN_TABLE_NAME)
# Check if all the DependentPlugins are already registered and enabled in the system
if "DependentPlugins" in plugin:
dependent_plugins = plugin["DependentPlugins"]
for d_plugin in dependent_plugins:
if d_plugin == name:
raise BadRequestError(f"Plugin '{d_plugin}' cannot be a dependent of itself")
response = plugin_table.get_item(
Key={
"Name": d_plugin,
"Version": "v0"
},
ConsistentRead=True
)
if "Item" not in response:
raise NotFoundError(f"Dependent plugin '{d_plugin}' not found")
elif not response["Item"]["Enabled"]:
raise BadRequestError(f"Dependent plugin '{d_plugin}' is disabled in the system")
else:
dependent_plugins = []
output_attributes = plugin["OutputAttributes"] if "OutputAttributes" in plugin else {}
response = plugin_table.get_item(
Key={
"Name": name,
"Version": "v0"
},
ConsistentRead=True
)
if "Item" not in response:
print(f"Registering a new plugin '{name}'")
plugin["Id"] = str(uuid.uuid4())
latest_version = 0
higher_version = 1
else:
print(f"Publishing a new version of the plugin '{name}'")
plugin["Id"] = response["Item"]["Id"]
latest_version = response["Item"]["Latest"]
higher_version = int(latest_version) + 1
plugin["Created"] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
plugin["Enabled"] = True
plugin["FrameworkVersion"] = FRAMEWORK_VERSION
state_definition = generate_plugin_state_definition(execution_type)
state_definition_str = json.dumps(state_definition)
state_definition_str = state_definition_str.replace("%%PLUGIN_NAME%%", name)
state_definition_str = state_definition_str.replace("%%PLUGIN_CLASS%%", plugin["Class"])
state_definition_str = state_definition_str.replace("%%PLUGIN_EXECUTION_TYPE%%", execution_type)
state_definition_str = state_definition_str.replace("%%PLUGIN_EXECUTE_LAMBDA_ARN%%",
plugin["ExecuteLambdaQualifiedARN"])
state_definition_str = state_definition_str.replace("\"%%PLUGIN_DEPENDENT_PLUGINS%%\"",
json.dumps(dependent_plugins))
state_definition_str = state_definition_str.replace("\"%%PLUGIN_OUTPUT_ATTRIBUTES%%\"",
json.dumps(output_attributes))
plugin["StateDefinition"] = state_definition_str
print(f"Plugin State Definition: {state_definition_str}")
# Serialize Python object to DynamoDB object
serialized_plugin = {k: serializer.serialize(v) for k, v in plugin.items()}
ddb_client.transact_write_items(
TransactItems=[
{
"Update": {
"TableName": PLUGIN_TABLE_NAME,
"Key": {
"Name": {"S": name},
"Version": {"S": "v0"}
},
"ConditionExpression": "attribute_not_exists(#Latest) OR #Latest = :Latest",
"UpdateExpression": "SET #Latest = :Higher_version, #Id = :Id, #Class = :Class, #Description = :Description, #ContentGroups = :ContentGroups, #ExecutionType = :ExecutionType, #SupportedMediaType = :SupportedMediaType, #ExecuteLambda = :ExecuteLambda, #StateDefinition = :StateDefinition, #ModelEndpoints = :ModelEndpoints, #Configuration = :Configuration, #OutputAttributes = :OutputAttributes, #DependentPlugins = :DependentPlugins, #Created = :Created, #Enabled = :Enabled, #FrameworkVersion = :FrameworkVersion",
"ExpressionAttributeNames": {
"#Latest": "Latest",
"#Id": "Id",
"#Class": "Class",
"#Description": "Description",
"#ContentGroups": "ContentGroups",
"#ExecutionType": "ExecutionType",
"#SupportedMediaType": "SupportedMediaType",
"#ExecuteLambda": "ExecuteLambdaQualifiedARN",
"#StateDefinition": "StateDefinition",
"#ModelEndpoints": "ModelEndpoints",
"#Configuration": "Configuration",
"#OutputAttributes": "OutputAttributes",
"#DependentPlugins": "DependentPlugins",
"#Created": "Created",
"#Enabled": "Enabled",
"#FrameworkVersion": "FrameworkVersion"
},
"ExpressionAttributeValues": {
":Latest": {"N": str(latest_version)},
":Higher_version": {"N": str(higher_version)},
":Id": serialized_plugin["Id"],
":Class": serialized_plugin["Class"],
":Description": serialized_plugin[
"Description"] if "Description" in serialized_plugin else {"S": ""},
":ContentGroups": serialized_plugin["ContentGroups"],
":ExecutionType": serialized_plugin["ExecutionType"],
":SupportedMediaType": serialized_plugin["SupportedMediaType"],
":ExecuteLambda": serialized_plugin["ExecuteLambdaQualifiedARN"],
":StateDefinition": serialized_plugin["StateDefinition"],
":ModelEndpoints": serialized_plugin[
"ModelEndpoints"] if execution_type == "SyncModel" else {"L": []},
":Configuration": serialized_plugin[
"Configuration"] if "Configuration" in serialized_plugin else {"M": {}},
":OutputAttributes": serialized_plugin[
"OutputAttributes"] if "OutputAttributes" in serialized_plugin else {"M": {}},
":DependentPlugins": serialized_plugin[
"DependentPlugins"] if "DependentPlugins" in serialized_plugin else {"L": []},
":Created": serialized_plugin["Created"],
":Enabled": serialized_plugin["Enabled"],
":FrameworkVersion": serialized_plugin["FrameworkVersion"]
}
}
},
{
"Put": {
"TableName": PLUGIN_TABLE_NAME,
"Item": {
"Name": {"S": name},
"Version": {"S": "v" + str(higher_version)},
"Id": serialized_plugin["Id"],
"Class": serialized_plugin["Class"],
"Description": serialized_plugin["Description"] if "Description" in serialized_plugin else {
"S": ""},
"ContentGroups": serialized_plugin["ContentGroups"],
"ExecutionType": serialized_plugin["ExecutionType"],
"SupportedMediaType": serialized_plugin["SupportedMediaType"],
"ExecuteLambdaQualifiedARN": serialized_plugin["ExecuteLambdaQualifiedARN"],
"StateDefinition": serialized_plugin["StateDefinition"],
"ModelEndpoints": serialized_plugin[
"ModelEndpoints"] if execution_type == "SyncModel" else {"L": []},
"Configuration": serialized_plugin[
"Configuration"] if "Configuration" in serialized_plugin else {"M": {}},
"OutputAttributes": serialized_plugin[
"OutputAttributes"] if "OutputAttributes" in serialized_plugin else {"M": {}},
"Created": serialized_plugin["Created"],
"Enabled": serialized_plugin["Enabled"],
"DependentPlugins": serialized_plugin[
"DependentPlugins"] if "DependentPlugins" in serialized_plugin else {"L": []},
"FrameworkVersion": serialized_plugin["FrameworkVersion"]
}
}
}
]
)
except BadRequestError as e:
print(f"Got chalice BadRequestError: {str(e)}")
raise
except ValidationError as e:
print(f"Got jsonschema ValidationError: {str(e)}")
raise BadRequestError(e.message)
except NotFoundError as e:
print(f"Got chalice NotFoundError: {str(e)}")
raise
except Exception as e:
print(f"Unable to register or publish a new version of the plugin: {str(e)}")
raise ChaliceViewError(f"Unable to register or publish a new version of the plugin: {str(e)}")
else:
print(
f"Successfully registered or published a new version of the plugin: {json.dumps(plugin, cls=DecimalEncoder)}")
return {
"Id": plugin["Id"],
"Version": "v" + str(higher_version)
}