def store_plugin_result()

in source/dataplaneapi/runtime/app.py [0:0]


def store_plugin_result():
    """
    Store the result of a plugin in a DynamoDB table.

    Body:

    .. code-block:: python

        {
            "Program": string,
            "Event": string,
            "ProfileName": string,
            "ChunkSize": integer,
            "ProcessingFrameRate": integer,
            "Classifier": string,
            "ExecutionId": string,
            "AudioTrack": integer,
            "Filename": string,
            "ChunkNumber": integer,
            "PluginName": string,
            "PluginClass": string,
            "ModelEndpoint": string,
            "Configuration": object,
            "OutputAttributesNameList": list,
            "Location": object,
            "Results": list
        }

    Returns:

        None
    
    Raises:
        400 - BadRequestError
        500 - ChaliceViewError
    """
    try:
        result = json.loads(app.current_request.raw_body.decode(), parse_float=Decimal)

        validate(instance=result, schema=API_SCHEMA["store_plugin_result"])

        print("Got a valid plugin result schema")

        program = result["Program"]
        event = result["Event"]
        plugin_name = result["PluginName"]
        plugin_class = result["PluginClass"]
        audio_track = str(result["AudioTrack"]) if "AudioTrack" in result else None
        results = result["Results"]

        print(
            f"Storing the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}'")
        print(f"Number of items to store: {len(results)}")

        plugin_result_table = ddb_resource.Table(PLUGIN_RESULT_TABLE_NAME)

        # If the plugin class is Optimizer, append the results to existing items in DynamoDB
        if plugin_class == "Optimizer":
            classifier = result["Classifier"]
            opto_audio_track = audio_track if audio_track is not None else "1"

            for item in results:
                is_update_required = False
                update_expression = []
                expression_attribute_names = {}
                expression_attribute_values = {}

                if "OptoStartCode" in item:
                    is_update_required = True
                    update_expression.append("#OptoStartCode = :OptoStartCode")
                    expression_attribute_names["#OptoStartCode"] = "OptoStartCode"
                    expression_attribute_values[":OptoStartCode"] = item["OptoStartCode"]

                    if "OptoStart" in item:
                        update_expression.append("#OptoStart.#AudioTrack = :OptoStart")
                        expression_attribute_names["#OptoStart"] = "OptoStart"
                        expression_attribute_names["#AudioTrack"] = opto_audio_track
                        expression_attribute_values[":OptoStart"] = round(item["OptoStart"], 3)

                        if "OptoStartDescription" in item:
                            update_expression.append("#OptoStartDescription = :OptoStartDescription")
                            expression_attribute_names["#OptoStartDescription"] = "OptoStartDescription"
                            expression_attribute_values[":OptoStartDescription"] = item["OptoStartDescription"]

                        if "OptoStartDetectorResults" in item:
                            update_expression.append("#OptoStartDetectorResults = :OptoStartDetectorResults")
                            expression_attribute_names["#OptoStartDetectorResults"] = "OptoStartDetectorResults"
                            expression_attribute_values[":OptoStartDetectorResults"] = item["OptoStartDetectorResults"]

                if "OptoEndCode" in item:
                    is_update_required = True
                    update_expression.append("#OptoEndCode = :OptoEndCode")
                    expression_attribute_names["#OptoEndCode"] = "OptoEndCode"
                    expression_attribute_values[":OptoEndCode"] = item["OptoEndCode"]

                    if "OptoEnd" in item:
                        update_expression.append("#OptoEnd.#AudioTrack = :OptoEnd")
                        expression_attribute_names["#OptoEnd"] = "OptoEnd"
                        expression_attribute_names["#AudioTrack"] = opto_audio_track
                        expression_attribute_values[":OptoEnd"] = round(item["OptoEnd"], 3)

                        if "OptoEndDescription" in item:
                            update_expression.append("#OptoEndDescription = :OptoEndDescription")
                            expression_attribute_names["#OptoEndDescription"] = "OptoEndDescription"
                            expression_attribute_values[":OptoEndDescription"] = item["OptoEndDescription"]

                        if "OptoEndDetectorResults" in item:
                            update_expression.append("#OptoEndDetectorResults = :OptoEndDetectorResults")
                            expression_attribute_names["#OptoEndDetectorResults"] = "OptoEndDetectorResults"
                            expression_attribute_values[":OptoEndDetectorResults"] = item["OptoEndDetectorResults"]

                if is_update_required:
                    print(f"Updating existing segment having Start={item['Start']} with the Optimizer plugin result")

                    plugin_result_table.update_item(
                        Key={
                            "PK": f"{program}#{event}#{classifier}",
                            "Start": item["Start"]
                        },
                        UpdateExpression="SET " + ", ".join(update_expression),
                        ExpressionAttributeNames=expression_attribute_names,
                        ExpressionAttributeValues=expression_attribute_values
                    )

                    item["Program"] = program
                    item["Event"] = event
                    item["ProfileName"] = result["ProfileName"]
                    item["PluginClass"] = result["PluginClass"]
                    item["Classifier"] = classifier
                    item["AudioTrack"] = opto_audio_track

                    # Send the Optimization status to EventBridge
                    put_events_to_event_bridge(plugin_class, item)

        # If the plugin class is Labeler, append the results to existing items in DynamoDB
        elif plugin_class == "Labeler":
            classifier = result["Classifier"]

            for item in results:
                update_expression = []
                expression_attribute_names = {}
                expression_attribute_values = {}

                if "LabelCode" in item:
                    update_expression.append("#LabelCode = :LabelCode")
                    expression_attribute_names["#LabelCode"] = "LabelCode"
                    expression_attribute_values[":LabelCode"] = item["LabelCode"]

                    if "Label" in item:
                        update_expression.append("#Label = :Label")
                        expression_attribute_names["#Label"] = "Label"
                        expression_attribute_values[":Label"] = item["Label"]

                    if "OutputAttributesNameList" in result:
                        for index, output_attribute in enumerate(result["OutputAttributesNameList"]):
                            if output_attribute in item and output_attribute != "Label":
                                update_expression.append(f"#OutAttr{index} = :OutAttr{index}")
                                expression_attribute_names[f"#OutAttr{index}"] = output_attribute
                                expression_attribute_values[f":OutAttr{index}"] = item[output_attribute]

                    print(f"Updating existing segment having Start={item['Start']} with the Labeler plugin result")

                    plugin_result_table.update_item(
                        Key={
                            "PK": f"{program}#{event}#{classifier}",
                            "Start": item["Start"]
                        },
                        UpdateExpression="SET " + ", ".join(update_expression),
                        ExpressionAttributeNames=expression_attribute_names,
                        ExpressionAttributeValues=expression_attribute_values
                    )

        else:
            with plugin_result_table.batch_writer() as batch:
                if audio_track is not None:
                    pk = f"{program}#{event}#{plugin_name}#{audio_track}"
                else:
                    pk = f"{program}#{event}#{plugin_name}"

                for item in results:
                    if plugin_class == "Classifier":
                        if "OptoStartCode" not in item:
                            item["OptoStartCode"] = "Not Attempted"
                            item["OptoStart"] = {}
                            item["OriginalClipStatus"] = {}
                            item["OriginalClipLocation"] = {}
                            item["OptimizedClipStatus"] = {}
                            item["OptimizedClipLocation"] = {}

                        if "End" in item and "OptoEndCode" not in item:
                            item["OptoEndCode"] = "Not Attempted"
                            item["OptoEnd"] = {}
                            item["LabelCode"] = "Not Attempted"
                            item["Label"] = ""

                    item["PK"] = pk
                    item["Start"] = round(item["Start"], 3)
                    item["End"] = round(item["End"], 3) if "End" in item else item["Start"]
                    item["ProgramEvent"] = f"{program}#{event}"
                    item["Program"] = program
                    item["Event"] = event
                    item["ProfileName"] = result["ProfileName"]
                    item["ChunkSize"] = result["ChunkSize"]
                    item["ProcessingFrameRate"] = result["ProcessingFrameRate"]
                    item["ExecutionId"] = result["ExecutionId"]
                    item["PluginName"] = plugin_name
                    item["Filename"] = result["Filename"]
                    item["ChunkNumber"] = result["ChunkNumber"]
                    item["PluginClass"] = result["PluginClass"]
                    item["ModelEndpoint"] = result["ModelEndpoint"] if "ModelEndpoint" in result else ""
                    item["Configuration"] = result["Configuration"] if "Configuration" in result else {}
                    item["Location"] = result["Location"]

                    if audio_track is not None:
                        item["AudioTrack"] = audio_track

                    batch.put_item(
                        Item=item
                    )

                    # Send the Segmentation status to EventBridge
                    if plugin_class == "Classifier":
                        put_events_to_event_bridge(plugin_class, item)

    except ValidationError as e:
        print(f"Got jsonschema ValidationError: {str(e)}")
        raise BadRequestError(e.message)

    except ClientError as e:
        print(f"Got DynamoDB ClientError: {str(e)}")
        error = e.response['Error']['Message']
        print(
            f"Unable to store the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}': {str(error)}")
        raise ChaliceViewError(
            f"Unable to store the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}': {str(error)}")

    except Exception as e:
        print(
            f"Unable to store the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}': {str(e)}")
        raise ChaliceViewError(
            f"Unable to store the result of program '{program}', event '{event}', plugin '{plugin_name}' in the DynamoDB table '{PLUGIN_RESULT_TABLE_NAME}': {str(e)}")

    else:
        return {}