def add_labels_to_metrics()

in aiops/MicroAgents/loader/nezha.py [0:0]


    def add_labels_to_metrics(self):
        # Join labels with metrics
        df_metrics_labels = self.df_label.lazy().join(
            self.df_metric_default.lazy(),
            left_on="inject_pod",
            right_on="PodName",
            how="inner",
            suffix="_metric"
        ).collect(streaming=True)

        # Calculate full anomaly flag
        df_metrics_labels = df_metrics_labels.with_columns(
            pl.when((pl.col("m_timestamp_metric") >= pl.col("m_timestamp+1")) &
                    (pl.col("m_timestamp_metric") <= pl.col("m_timestamp+3")))
            .then(True)
            .otherwise(False)
            .alias("is_full_anomaly")
        )

        # Calculate anomaly ratio for early metrics
        df_metrics_labels_early = df_metrics_labels.filter(
            (pl.col("m_timestamp_metric") > pl.col("m_timestamp")) &
            (pl.col("m_timestamp_metric") < pl.col("m_timestamp+1"))
        ).with_columns(
            ((pl.col("m_timestamp_metric") - pl.col("m_timestamp")) / pl.duration(minutes=1)).alias("ano_ratio")
        )

        # Calculate anomaly ratio for late metrics
        df_metrics_labels_late = df_metrics_labels.filter(
            (pl.col("m_timestamp_metric") > pl.col("m_timestamp+3")) &
            (pl.col("m_timestamp_metric") < pl.col("m_timestamp+4"))
        ).with_columns(
            ((pl.col("m_timestamp+4") - pl.col("m_timestamp_metric")) / pl.duration(minutes=1)).alias("ano_ratio")
        )

        # Combine early and late metrics
        df_metrics_labels_combined = df_metrics_labels_early.vstack(df_metrics_labels_late)

        # Handle full anomalies
        df_metrics_labels_full = df_metrics_labels.filter(pl.col("is_full_anomaly")).with_columns(pl.lit(1.0).alias("ano_ratio"))

        # Stack all together
        df_metrics_labels_final = df_metrics_labels_full.vstack(df_metrics_labels_combined)

        # Select relevant columns
        df_anomalies = df_metrics_labels_final.select(["row_nr", "is_full_anomaly", "ano_ratio", "inject_type"])

        # Join and update the default metrics DataFrame
        self.df_metric_default = self.df_metric_default.join(
            df_anomalies, 
            on="row_nr", 
            how="left"
        ).with_columns([
            pl.col("is_full_anomaly").fill_null(False),
            pl.col("ano_ratio").fill_null(0),
            ((pl.col("inject_type") == "cpu_consumed") |
            (pl.col("inject_type") == "network_delay") |
            (pl.col("inject_type") == "cpu_contention")).alias("metric_anomaly")
        ])