in next_steps/readable_alerts_html/readable_alerts_html.py [0:0]
def lookup_anomaly_group_and_get_details(anomaly_detector_arn, alert_timestamp, alert_metric_name):
relevant_time_series = []
# Extract region name from detector ARN
region_name, _, _ = parse_anomaly_detector_arn(anomaly_detector_arn)
# Create L4M client
lookoutmetrics_client = boto3.client(
"lookoutmetrics", region_name=region_name)
# Find the anomaly group by checking metric names and timestamps
anomaly_group_id = None
next_token = None
while True:
params = {
"AnomalyDetectorArn": anomaly_detector_arn,
"SensitivityThreshold": 0,
"MaxResults": 100,
}
if next_token:
params["NextToken"] = next_token
response = lookoutmetrics_client.list_anomaly_group_summaries(**params)
for anomaly_group in response["AnomalyGroupSummaryList"]:
anomaly_group_metric_name = anomaly_group['PrimaryMetricName']
anomaly_group_start_time = parse_l4m_timestamp(
anomaly_group["StartTime"])
anomaly_group_end_time = parse_l4m_timestamp(
anomaly_group["EndTime"])
# Check if this AnomalyGroup matches
if alert_metric_name == anomaly_group_metric_name:
if anomaly_group_start_time <= alert_timestamp and alert_timestamp <= anomaly_group_end_time:
anomaly_group_id = anomaly_group["AnomalyGroupId"]
break
# Exit loop when found
if anomaly_group_id:
break
# Loop continues when NextToken is included in the response
if "NextToken" in response:
next_token = response["NextToken"]
continue
# Not found
break
# If anomaly group was found, gather all relevant time series
if anomaly_group_id:
next_token = None
while True:
params = {
"AnomalyDetectorArn": anomaly_detector_arn,
"AnomalyGroupId": anomaly_group_id,
"MetricName": alert_metric_name,
"MaxResults": 100,
}
if next_token:
params["NextToken"] = next_token
response = lookoutmetrics_client.list_anomaly_group_time_series(
**params)
# Gather relevant time series
for time_series in response["TimeSeriesList"]:
relevant_time_series.append(time_series["DimensionList"])
# Loop continues when NextToken is included in the response
if "NextToken" in response:
next_token = response["NextToken"]
continue
break
else:
raise KeyError("AnomalyGroup not found")
# Return anomaly group id, and list of relevant time series
return anomaly_group_id, relevant_time_series