in aiops/MicroAgents/loader/nezha.py [0:0]
def add_labels_to_metrics(self):
# Join labels with metrics
df_metrics_labels = self.df_label.lazy().join(
self.df_metric_default.lazy(),
left_on="inject_pod",
right_on="PodName",
how="inner",
suffix="_metric"
).collect(streaming=True)
# Calculate full anomaly flag
df_metrics_labels = df_metrics_labels.with_columns(
pl.when((pl.col("m_timestamp_metric") >= pl.col("m_timestamp+1")) &
(pl.col("m_timestamp_metric") <= pl.col("m_timestamp+3")))
.then(True)
.otherwise(False)
.alias("is_full_anomaly")
)
# Calculate anomaly ratio for early metrics
df_metrics_labels_early = df_metrics_labels.filter(
(pl.col("m_timestamp_metric") > pl.col("m_timestamp")) &
(pl.col("m_timestamp_metric") < pl.col("m_timestamp+1"))
).with_columns(
((pl.col("m_timestamp_metric") - pl.col("m_timestamp")) / pl.duration(minutes=1)).alias("ano_ratio")
)
# Calculate anomaly ratio for late metrics
df_metrics_labels_late = df_metrics_labels.filter(
(pl.col("m_timestamp_metric") > pl.col("m_timestamp+3")) &
(pl.col("m_timestamp_metric") < pl.col("m_timestamp+4"))
).with_columns(
((pl.col("m_timestamp+4") - pl.col("m_timestamp_metric")) / pl.duration(minutes=1)).alias("ano_ratio")
)
# Combine early and late metrics
df_metrics_labels_combined = df_metrics_labels_early.vstack(df_metrics_labels_late)
# Handle full anomalies
df_metrics_labels_full = df_metrics_labels.filter(pl.col("is_full_anomaly")).with_columns(pl.lit(1.0).alias("ano_ratio"))
# Stack all together
df_metrics_labels_final = df_metrics_labels_full.vstack(df_metrics_labels_combined)
# Select relevant columns
df_anomalies = df_metrics_labels_final.select(["row_nr", "is_full_anomaly", "ano_ratio", "inject_type"])
# Join and update the default metrics DataFrame
self.df_metric_default = self.df_metric_default.join(
df_anomalies,
on="row_nr",
how="left"
).with_columns([
pl.col("is_full_anomaly").fill_null(False),
pl.col("ano_ratio").fill_null(0),
((pl.col("inject_type") == "cpu_consumed") |
(pl.col("inject_type") == "network_delay") |
(pl.col("inject_type") == "cpu_contention")).alias("metric_anomaly")
])