in aiops/MicroAgents/layers/data_layer/log.py [0:0]
def anomaly_detection(self, recent_df, history_df, time_col='m_timestamp', log_col='m_message', window_size={'minutes':1}, k=3, summary=False, **kwargs):
# recent_df = recent_df.filter(pl.col(log_col).str.lengths()<=1000)
recent_df = self.parse(recent_df, log_col)
history_df = self.parse(history_df, log_col)
counted_recent_df = self.get_counts(recent_df, time_col, log_col, window_size)
counted_history_df = self.get_counts(history_df, time_col, log_col, window_size={'minutes':2})
res = {'new_template':[], 'boom_template':[]}
for template_id in counted_recent_df['log_template_id'].unique():
counted_recent_df_template = counted_recent_df.filter(pl.col('log_template_id') == template_id)
counted_history_df_template = counted_history_df.filter(pl.col('log_template_id') == template_id)
if counted_history_df_template.shape[0] == 0:
res['new_template'].append((counted_recent_df_template[log_col][0], template_id))
else:
res_template, _ = ksigma_anomaly_detection(counted_recent_df_template, counted_history_df_template, metric_columns=['count_per_window'], k=k)
if len(res_template['increased_metric']) > 0:
# print(counted_recent_df_template)
# print(counted_history_df_template)
res['boom_template'].append((counted_recent_df_template[log_col][0], template_id))
res_text = ""
if res['boom_template']:
res_text += f"There's been an explosion of logs, which may be abnormal: \n"
for i, l in enumerate(res['boom_template']):
res_text += f"\t Log {i+1}. {l[0]}\n"
recent_df = recent_df.filter(pl.col('log_template_id') < 0)
template_id_index, log_message_idx, raw_log_message_idx = recent_df.columns.index('log_template_id'), recent_df.columns.index(log_col), recent_df.columns.index('raw_m_message')
recent_df = recent_df.unique(subset=['log_template_id'], keep='last', maintain_order=True)
content_abnormal_logs = []
for row in recent_df.iter_rows():
template_id = row[template_id_index]
log_message = row[log_message_idx]
raw_log_message = row[raw_log_message_idx]
if template_id < 0:
log_template = self.template_miner_new.drain.id_to_cluster.get(-template_id).get_template()
llm_res = self.log_expert.detect(template_id, log_template, log_message)
# print(template_id, llm_res)
if llm_res['is_abnormal']:
content_abnormal_logs.append((log_message, log_template))
if len(content_abnormal_logs) > 0:
res_text += f"There are logs with abnormal content: \n"
for i, log in enumerate(content_abnormal_logs):
res_text += f"\t Log {i+1}. {log[0]}\n"
if summary and res_text:
res_text = self.log_summary.summary(res_text)
abnormal_template = []
for log, template_id in res['boom_template']:
if template_id >= 0:
log_template = self.template_miner.drain.id_to_cluster.get(template_id).get_template()
elif template_id < 0:
log_template = self.template_miner_new.drain.id_to_cluster.get(-template_id).get_template()
abnormal_template.append(log_template)
for log_message, log_template in content_abnormal_logs:
abnormal_template.append(log_template)
return res_text, abnormal_template