in source/controlplaneapi/runtime/app.py [0:0]
def create_profile():
"""
Create a new processing profile. A processing profile is a logical grouping of:
- One Classifier plugin
- Zero or One Optimizer plugin
- Zero or One Labeler plugin
- Zero or More Featurer plugins
Additionally, each of these plugins can optionally have one or more dependent plugins.
Body:
.. code-block:: python
{
"Name": string,
"Description": string,
"ContentGroups": list,
"ChunkSize": number,
"MaxSegmentLengthSeconds": number,
"ProcessingFrameRate": number,
"Classifier": {
"Name": string,
"ModelEndpoint": {
"Name": string,
"Version": string
},
"Configuration" : {
"configuration1": "value1",
...
},
"DependentPlugins": [
{
"Name": string,
"ModelEndpoint": {
"Name": string,
"Version": string
},
"Configuration" : {
"configuration1": "value1",
...
}
},
...
]
},
"Optimizer": {
"Name": string,
"ModelEndpoint": {
"Name": string,
"Version": string
},
"Configuration" : {
"configuration1": "value1",
...
},
"DependentPlugins": [
{
"Name": string,
"ModelEndpoint": {
"Name": string,
"Version": string
},
"Configuration" : {
"configuration1": "value1",
...
}
},
...
]
},
"Labeler": {
"Name": string,
"ModelEndpoint": {
"Name": string,
"Version": string
},
"Configuration" : {
"configuration1": "value1",
...
},
"DependentPlugins": [
{
"Name": string,
"ModelEndpoint": {
"Name": string,
"Version": string
},
"Configuration" : {
"configuration1": "value1",
...
}
},
...
]
},
"Featurers": [
{
"Name": string,
"ModelEndpoint": {
"Name": string,
"Version": string
},
"Configuration" : {
"configuration1": "value1",
...
},
"DependentPlugins": [
{
"Name": string,
"ModelEndpoint": {
"Name": string,
"Version": string
},
"Configuration" : {
"configuration1": "value1",
...
}
},
...
]
},
...
]
}
Parameters:
- Name: Name of the Profile,
- Description: Profile description
- ContentGroups: List of Content Groups
- ChunkSize: The size of the Video Chunk size represented by number of Frames in a Video Segment.
- MaxSegmentLengthSeconds: Length of a video segment in Seconds
- ProcessingFrameRate: The video Frames/Sec used by MRE to perform analysis
- Classifier: The Dict which represents a Classifier Plugin. Refer to the Plugin API for details on Plugin Parameters.
- Featurers: A List of Featurer plugins. Each Dict which represents a Featurer Plugin. Refer to the Plugin API for details on Plugin Parameters.
- Optimizer:The Dict which represents a Optimizer Plugin. Refer to the Plugin API for details on Plugin Parameters.
- Labeler:The Dict which represents a Labeler Plugin. Refer to the Plugin API for details on Plugin Parameters.
Returns:
None
Raises:
400 - BadRequestError
404 - NotFoundError
409 - ConflictError
500 - ChaliceViewError
"""
try:
profile = json.loads(app.current_request.raw_body.decode(), parse_float=Decimal)
validate(instance=profile, schema=API_SCHEMA["create_profile"])
print("Got a valid profile schema")
name = profile["Name"]
print(f"Creating the processing profile '{name}'")
profile_copy = copy.deepcopy(profile)
state_definition, plugin_definitions = profile_state_definition_helper(name, profile_copy)
profile["StateDefinition"] = state_definition
profile["Id"] = str(uuid.uuid4())
profile["Created"] = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
profile["LastModified"] = profile["Created"]
profile["Enabled"] = True
sfn_name = f"aws-mre-{''.join(name.split())}-state-machine"
print(f"Creating the StepFunction State Machine '{sfn_name}'")
response = sfn_client.create_state_machine(
name=sfn_name,
definition=profile["StateDefinition"],
roleArn=SFN_ROLE_ARN,
type="STANDARD",
tags=[
{
"key": "Project",
"value": "MRE"
},
{
"key": "Profile",
"value": name
}
],
tracingConfiguration={
"enabled": True
}
)
profile["StateMachineArn"] = response["stateMachineArn"]
# === Enrich profile by adding the latest version number of all the plugins provided in the profile ===
# Classifier and its DependentPlugins
profile["Classifier"]["Version"] = plugin_definitions[profile["Classifier"]["Name"]]["Latest"]
if "DependentPlugins" in profile["Classifier"]:
for index, d_plugin in enumerate(profile["Classifier"]["DependentPlugins"]):
profile["Classifier"]["DependentPlugins"][index]["Version"] = plugin_definitions[d_plugin["Name"]]["Latest"]
# Optimizer and its DependentPlugins
if "Optimizer" in profile:
profile["Optimizer"]["Version"] = plugin_definitions[profile["Optimizer"]["Name"]]["Latest"]
if "DependentPlugins" in profile["Optimizer"]:
for index, d_plugin in enumerate(profile["Optimizer"]["DependentPlugins"]):
profile["Optimizer"]["DependentPlugins"][index]["Version"] = plugin_definitions[d_plugin["Name"]]["Latest"]
# Labeler and its DependentPlugins
if "Labeler" in profile:
profile["Labeler"]["Version"] = plugin_definitions[profile["Labeler"]["Name"]]["Latest"]
if "DependentPlugins" in profile["Labeler"]:
for index, d_plugin in enumerate(profile["Labeler"]["DependentPlugins"]):
profile["Labeler"]["DependentPlugins"][index]["Version"] = plugin_definitions[d_plugin["Name"]]["Latest"]
# Featurers and their DependentPlugins
if "Featurers" in profile:
for p_index, featurer in enumerate(profile["Featurers"]):
profile["Featurers"][p_index]["Version"] = plugin_definitions[featurer["Name"]]["Latest"]
if "DependentPlugins" in featurer:
for c_index, d_plugin in enumerate(featurer["DependentPlugins"]):
profile["Featurers"][p_index]["DependentPlugins"][c_index]["Version"] = plugin_definitions[d_plugin["Name"]]["Latest"]
# === End of enrichment ===
profile_table = ddb_resource.Table(PROFILE_TABLE_NAME)
profile_table.put_item(
Item=profile,
ConditionExpression="attribute_not_exists(#Name)",
ExpressionAttributeNames={
"#Name": "Name"
}
)
except NotFoundError as e:
print(f"Got chalice NotFoundError: {str(e)}")
raise
except BadRequestError as e:
print(f"Got chalice BadRequestError: {str(e)}")
raise
except ValidationError as e:
print(f"Got jsonschema ValidationError: {str(e)}")
raise BadRequestError(e.message)
except ConflictError as e:
print(f"Got chalice ConflictError: {str(e)}")
raise
except ClientError as e:
print(f"Got DynamoDB ClientError: {str(e)}")
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
raise ConflictError(f"Profile '{name}' already exists")
else:
raise
except Exception as e:
if "StateMachineArn" in profile:
sfn_client.delete_state_machine(
stateMachineArn=profile["StateMachineArn"]
)
print(f"Unable to create the processing profile: {str(e)}")
raise ChaliceViewError(f"Unable to create the processing profile: {str(e)}")
else:
print(f"Successfully created the processing profile: {json.dumps(profile, cls=DecimalEncoder)}")
return {}