def anomaly_detection()

in aiops/MicroAgents/layers/data_layer/log.py [0:0]


    def anomaly_detection(self, recent_df, history_df, time_col='m_timestamp', log_col='m_message', window_size={'minutes':1}, k=3, summary=False, **kwargs):
        
        # recent_df = recent_df.filter(pl.col(log_col).str.lengths()<=1000)
        recent_df = self.parse(recent_df, log_col)
        history_df = self.parse(history_df, log_col)

        counted_recent_df = self.get_counts(recent_df, time_col, log_col, window_size)
        counted_history_df = self.get_counts(history_df, time_col, log_col, window_size={'minutes':2})    

        res = {'new_template':[], 'boom_template':[]}
        for template_id in counted_recent_df['log_template_id'].unique():
            counted_recent_df_template = counted_recent_df.filter(pl.col('log_template_id') == template_id)
            counted_history_df_template = counted_history_df.filter(pl.col('log_template_id') == template_id)
            if counted_history_df_template.shape[0] == 0:

                res['new_template'].append((counted_recent_df_template[log_col][0], template_id))
            else:
                res_template, _ = ksigma_anomaly_detection(counted_recent_df_template, counted_history_df_template, metric_columns=['count_per_window'], k=k)
                if len(res_template['increased_metric']) > 0:
                    # print(counted_recent_df_template)
                    # print(counted_history_df_template)
                    res['boom_template'].append((counted_recent_df_template[log_col][0], template_id))
            
        res_text = ""

        if res['boom_template']:
            res_text += f"There's been an explosion of logs, which may be abnormal: \n"
            for i, l in enumerate(res['boom_template']):
                res_text += f"\t Log {i+1}. {l[0]}\n"

        recent_df = recent_df.filter(pl.col('log_template_id') < 0)
        template_id_index, log_message_idx, raw_log_message_idx = recent_df.columns.index('log_template_id'), recent_df.columns.index(log_col), recent_df.columns.index('raw_m_message')
        recent_df = recent_df.unique(subset=['log_template_id'], keep='last', maintain_order=True)
        content_abnormal_logs = []
        for row in recent_df.iter_rows():
            template_id = row[template_id_index]
            log_message = row[log_message_idx]
            raw_log_message = row[raw_log_message_idx]
            if template_id < 0:
                log_template = self.template_miner_new.drain.id_to_cluster.get(-template_id).get_template()
                llm_res = self.log_expert.detect(template_id, log_template, log_message)
                # print(template_id, llm_res)
                if llm_res['is_abnormal']:
                    content_abnormal_logs.append((log_message, log_template))
        if len(content_abnormal_logs) > 0:
            res_text += f"There are logs with abnormal content: \n"
            for i, log in enumerate(content_abnormal_logs):
                res_text += f"\t Log {i+1}. {log[0]}\n"
        
        if summary and res_text:
            res_text = self.log_summary.summary(res_text)

        abnormal_template = []
        for log, template_id in res['boom_template']:
            if template_id >= 0:
                log_template = self.template_miner.drain.id_to_cluster.get(template_id).get_template()
            elif template_id < 0:
                log_template = self.template_miner_new.drain.id_to_cluster.get(-template_id).get_template()
            abnormal_template.append(log_template)
        for log_message, log_template in content_abnormal_logs:
            abnormal_template.append(log_template)
        
        return res_text, abnormal_template