def _create_single_metric()

in CommonLayerCode/datalake-library/python/datalake_library/octagon/metric.py [0:0]


    def _create_single_metric(self, metric_rec: MetricRecordInfo, value: int):
        self.logger.debug(f"create_single_metric() {metric_rec}")

        result = self.metrics_table.get_item(
            Key={"root": metric_rec.root, "metric": metric_rec.metric}, AttributesToGet=["root", "metric", "version"]
        )

        utc_time_iso = get_timestamp_iso()
        local_date_iso = get_local_date()

        metric_found = "Item" in result
        new_metric_value = None
        if metric_found:  # Update existing metrics
            version = result["Item"]["version"]

            expr_names = {
                "#V": "version",
                "#T": "last_updated_timestamp",
                "#D": "last_updated_date",
                "#X": "value",
                "#P": "last_pipeline_execution_id",
            }

            expr_values = {
                ":X": value,
                ":INC": 1,
                ":T": utc_time_iso,
                ":D": local_date_iso,
                ":V": version,
                ":P": self.client.pipeline_execution_id,
            }
            update_expr = "ADD #V :INC, #X :X SET #T = :T, #P = :P, #D = :D"

            result = self.metrics_table.update_item(
                Key={"root": metric_rec.root, "metric": metric_rec.metric},
                UpdateExpression=update_expr,
                ExpressionAttributeValues=expr_values,
                ExpressionAttributeNames=expr_names,
                ConditionExpression="#V = :V",
                ReturnValues="UPDATED_NEW",
            )
            # self.logger.debug(result)
            new_metric_value = int(result["Attributes"]["value"])
            new_metric_version = int(result["Attributes"]["version"])

        else:  # New metric
            item = {}
            item["root"] = metric_rec.root
            item["metric"] = metric_rec.metric
            item["type"] = metric_rec.metric_type
            item["creation_timestamp"] = utc_time_iso
            item["last_updated_timestamp"] = utc_time_iso
            item["last_updated_date"] = local_date_iso
            item["last_pipeline_execution_id"] = self.client.pipeline_execution_id
            item["value"] = value
            item["version"] = 1
            if self.metrics_ttl > 0:
                item["ttl"] = get_ttl(self.metrics_ttl)

            self.metrics_table.put_item(Item=item)
            new_metric_value = int(value)
            new_metric_version = 1

        # Process threshold settings and send SNS notifications
        self._process_sns_notifications(metric_rec, new_metric_value, new_metric_version)

        return True