in source/dataplaneapi/runtime/app.py [0:0]
def store_plugin_result():
"""
Store the result of a plugin in a DynamoDB table.
Body:
.. code-block:: python
{
"Program": string,
"Event": string,
"ProfileName": string,
"ChunkSize": integer,
"ProcessingFrameRate": integer,
"Classifier": string,
"ExecutionId": string,
"AudioTrack": integer,
"Filename": string,
"ChunkNumber": integer,
"PluginName": string,
"PluginClass": string,
"ModelEndpoint": string,
"Configuration": object,
"OutputAttributesNameList": list,
"Location": object,
"Results": list
}
Returns:
None
Raises:
400 - BadRequestError
500 - ChaliceViewError
"""
try:
result = json.loads(app.current_request.raw_body.decode(), parse_float=Decimal)
validate(instance=result, schema=API_SCHEMA["store_plugin_result"])
print("Got a valid plugin result schema")
program = result["Program"]
event = result["Event"]
plugin_name = result["PluginName"]
plugin_class = result["PluginClass"]
audio_track = str(result["AudioTrack"]) if "AudioTrack" in result else None
results = result["Results"]
print(
f"Storing the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}'")
print(f"Number of items to store: {len(results)}")
plugin_result_table = ddb_resource.Table(PLUGIN_RESULT_TABLE_NAME)
# If the plugin class is Optimizer, append the results to existing items in DynamoDB
if plugin_class == "Optimizer":
classifier = result["Classifier"]
opto_audio_track = audio_track if audio_track is not None else "1"
for item in results:
is_update_required = False
update_expression = []
expression_attribute_names = {}
expression_attribute_values = {}
if "OptoStartCode" in item:
is_update_required = True
update_expression.append("#OptoStartCode = :OptoStartCode")
expression_attribute_names["#OptoStartCode"] = "OptoStartCode"
expression_attribute_values[":OptoStartCode"] = item["OptoStartCode"]
if "OptoStart" in item:
update_expression.append("#OptoStart.#AudioTrack = :OptoStart")
expression_attribute_names["#OptoStart"] = "OptoStart"
expression_attribute_names["#AudioTrack"] = opto_audio_track
expression_attribute_values[":OptoStart"] = round(item["OptoStart"], 3)
if "OptoStartDescription" in item:
update_expression.append("#OptoStartDescription = :OptoStartDescription")
expression_attribute_names["#OptoStartDescription"] = "OptoStartDescription"
expression_attribute_values[":OptoStartDescription"] = item["OptoStartDescription"]
if "OptoStartDetectorResults" in item:
update_expression.append("#OptoStartDetectorResults = :OptoStartDetectorResults")
expression_attribute_names["#OptoStartDetectorResults"] = "OptoStartDetectorResults"
expression_attribute_values[":OptoStartDetectorResults"] = item["OptoStartDetectorResults"]
if "OptoEndCode" in item:
is_update_required = True
update_expression.append("#OptoEndCode = :OptoEndCode")
expression_attribute_names["#OptoEndCode"] = "OptoEndCode"
expression_attribute_values[":OptoEndCode"] = item["OptoEndCode"]
if "OptoEnd" in item:
update_expression.append("#OptoEnd.#AudioTrack = :OptoEnd")
expression_attribute_names["#OptoEnd"] = "OptoEnd"
expression_attribute_names["#AudioTrack"] = opto_audio_track
expression_attribute_values[":OptoEnd"] = round(item["OptoEnd"], 3)
if "OptoEndDescription" in item:
update_expression.append("#OptoEndDescription = :OptoEndDescription")
expression_attribute_names["#OptoEndDescription"] = "OptoEndDescription"
expression_attribute_values[":OptoEndDescription"] = item["OptoEndDescription"]
if "OptoEndDetectorResults" in item:
update_expression.append("#OptoEndDetectorResults = :OptoEndDetectorResults")
expression_attribute_names["#OptoEndDetectorResults"] = "OptoEndDetectorResults"
expression_attribute_values[":OptoEndDetectorResults"] = item["OptoEndDetectorResults"]
if is_update_required:
print(f"Updating existing segment having Start={item['Start']} with the Optimizer plugin result")
plugin_result_table.update_item(
Key={
"PK": f"{program}#{event}#{classifier}",
"Start": item["Start"]
},
UpdateExpression="SET " + ", ".join(update_expression),
ExpressionAttributeNames=expression_attribute_names,
ExpressionAttributeValues=expression_attribute_values
)
item["Program"] = program
item["Event"] = event
item["ProfileName"] = result["ProfileName"]
item["PluginClass"] = result["PluginClass"]
item["Classifier"] = classifier
item["AudioTrack"] = opto_audio_track
# Send the Optimization status to EventBridge
put_events_to_event_bridge(plugin_class, item)
# If the plugin class is Labeler, append the results to existing items in DynamoDB
elif plugin_class == "Labeler":
classifier = result["Classifier"]
for item in results:
update_expression = []
expression_attribute_names = {}
expression_attribute_values = {}
if "LabelCode" in item:
update_expression.append("#LabelCode = :LabelCode")
expression_attribute_names["#LabelCode"] = "LabelCode"
expression_attribute_values[":LabelCode"] = item["LabelCode"]
if "Label" in item:
update_expression.append("#Label = :Label")
expression_attribute_names["#Label"] = "Label"
expression_attribute_values[":Label"] = item["Label"]
if "OutputAttributesNameList" in result:
for index, output_attribute in enumerate(result["OutputAttributesNameList"]):
if output_attribute in item and output_attribute != "Label":
update_expression.append(f"#OutAttr{index} = :OutAttr{index}")
expression_attribute_names[f"#OutAttr{index}"] = output_attribute
expression_attribute_values[f":OutAttr{index}"] = item[output_attribute]
print(f"Updating existing segment having Start={item['Start']} with the Labeler plugin result")
plugin_result_table.update_item(
Key={
"PK": f"{program}#{event}#{classifier}",
"Start": item["Start"]
},
UpdateExpression="SET " + ", ".join(update_expression),
ExpressionAttributeNames=expression_attribute_names,
ExpressionAttributeValues=expression_attribute_values
)
else:
with plugin_result_table.batch_writer() as batch:
if audio_track is not None:
pk = f"{program}#{event}#{plugin_name}#{audio_track}"
else:
pk = f"{program}#{event}#{plugin_name}"
for item in results:
if plugin_class == "Classifier":
if "OptoStartCode" not in item:
item["OptoStartCode"] = "Not Attempted"
item["OptoStart"] = {}
item["OriginalClipStatus"] = {}
item["OriginalClipLocation"] = {}
item["OptimizedClipStatus"] = {}
item["OptimizedClipLocation"] = {}
if "End" in item and "OptoEndCode" not in item:
item["OptoEndCode"] = "Not Attempted"
item["OptoEnd"] = {}
item["LabelCode"] = "Not Attempted"
item["Label"] = ""
item["PK"] = pk
item["Start"] = round(item["Start"], 3)
item["End"] = round(item["End"], 3) if "End" in item else item["Start"]
item["ProgramEvent"] = f"{program}#{event}"
item["Program"] = program
item["Event"] = event
item["ProfileName"] = result["ProfileName"]
item["ChunkSize"] = result["ChunkSize"]
item["ProcessingFrameRate"] = result["ProcessingFrameRate"]
item["ExecutionId"] = result["ExecutionId"]
item["PluginName"] = plugin_name
item["Filename"] = result["Filename"]
item["ChunkNumber"] = result["ChunkNumber"]
item["PluginClass"] = result["PluginClass"]
item["ModelEndpoint"] = result["ModelEndpoint"] if "ModelEndpoint" in result else ""
item["Configuration"] = result["Configuration"] if "Configuration" in result else {}
item["Location"] = result["Location"]
if audio_track is not None:
item["AudioTrack"] = audio_track
batch.put_item(
Item=item
)
# Send the Segmentation status to EventBridge
if plugin_class == "Classifier":
put_events_to_event_bridge(plugin_class, item)
except ValidationError as e:
print(f"Got jsonschema ValidationError: {str(e)}")
raise BadRequestError(e.message)
except ClientError as e:
print(f"Got DynamoDB ClientError: {str(e)}")
error = e.response['Error']['Message']
print(
f"Unable to store the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}': {str(error)}")
raise ChaliceViewError(
f"Unable to store the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}': {str(error)}")
except Exception as e:
print(
f"Unable to store the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}': {str(e)}")
raise ChaliceViewError(
f"Unable to store the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}': {str(e)}")
else:
return {}