in CommonLayerCode/datalake-library/python/datalake_library/octagon/metric.py [0:0]
def _create_single_metric(self, metric_rec: MetricRecordInfo, value: int):
self.logger.debug(f"create_single_metric() {metric_rec}")
result = self.metrics_table.get_item(
Key={"root": metric_rec.root, "metric": metric_rec.metric}, AttributesToGet=["root", "metric", "version"]
)
utc_time_iso = get_timestamp_iso()
local_date_iso = get_local_date()
metric_found = "Item" in result
new_metric_value = None
if metric_found: # Update existing metrics
version = result["Item"]["version"]
expr_names = {
"#V": "version",
"#T": "last_updated_timestamp",
"#D": "last_updated_date",
"#X": "value",
"#P": "last_pipeline_execution_id",
}
expr_values = {
":X": value,
":INC": 1,
":T": utc_time_iso,
":D": local_date_iso,
":V": version,
":P": self.client.pipeline_execution_id,
}
update_expr = "ADD #V :INC, #X :X SET #T = :T, #P = :P, #D = :D"
result = self.metrics_table.update_item(
Key={"root": metric_rec.root, "metric": metric_rec.metric},
UpdateExpression=update_expr,
ExpressionAttributeValues=expr_values,
ExpressionAttributeNames=expr_names,
ConditionExpression="#V = :V",
ReturnValues="UPDATED_NEW",
)
# self.logger.debug(result)
new_metric_value = int(result["Attributes"]["value"])
new_metric_version = int(result["Attributes"]["version"])
else: # New metric
item = {}
item["root"] = metric_rec.root
item["metric"] = metric_rec.metric
item["type"] = metric_rec.metric_type
item["creation_timestamp"] = utc_time_iso
item["last_updated_timestamp"] = utc_time_iso
item["last_updated_date"] = local_date_iso
item["last_pipeline_execution_id"] = self.client.pipeline_execution_id
item["value"] = value
item["version"] = 1
if self.metrics_ttl > 0:
item["ttl"] = get_ttl(self.metrics_ttl)
self.metrics_table.put_item(Item=item)
new_metric_value = int(value)
new_metric_version = 1
# Process threshold settings and send SNS notifications
self._process_sns_notifications(metric_rec, new_metric_value, new_metric_version)
return True