in CommonLayerCode/datalake-library/python/datalake_library/octagon/metric.py [0:0]
def _process_sns_notifications(self, metric_rec: MetricRecordInfo, new_metric_value: int, new_metric_version):
for metric_config_info in self.client.config.metric_info:
if (
metric_rec.root == metric_config_info.metric
and metric_rec.metric_type == metric_config_info.metric_type
):
if self._check_metric_threshold(
new_metric_value, metric_config_info.evaluation, metric_config_info.threshold
) and (
(metric_config_info.notify == METRIC_ALWAYS)
or (metric_config_info.notify == METRIC_ONCE and not self._is_notification_sent(metric_rec))
):
message = {
"root": metric_rec.root,
"metric": metric_rec.metric,
"type": metric_rec.metric_type,
"threshold": metric_config_info.threshold,
"value": new_metric_value,
}
message_str = json.dumps(message)
# Get Global ARN if defined, then local metric ARN, then pass sending to SNS
if self.client.is_sns_set():
topic = self.client.sns_topic
elif metric_config_info.sns_topic != "":
topic = metric_config_info.sns_topic
else:
self.logger.warn("SNS ARN is not defined neither globally nor in the metrics")
return True
topic_arn = self._get_topic_arn(topic)
sns_result = self._send_sns_message(message_str, topic_arn)
sns_message_id = sns_result["MessageId"]
self._update_notification_info(
metric_info=metric_rec,
frequency=metric_config_info.notify,
threshold=metric_config_info.threshold,
sns_topic_arn=topic_arn,
sns_message_id=sns_message_id,
version=new_metric_version,
)
return True