in aiops/MicroAgents/loader/nezha.py [0:0]
def load(self):
log_queries = []
metric_queries = {
'src_dst': [],
'front_srv': [],
'dep': [],
'default': []
}
trace_queries = []
label_queries = []
all_label_df = pl.DataFrame()
date_pattern = re.compile(r'\d{4}-\d{2}-\d{2}') # Regular expression for 'dddd-dd-dd'
# Iterate over all subdirectories in the base directory
for subdir, dirs, files in os.walk(self.filename):
# Check if the current subdir is within rca_data or construct_data
if "rca_data" in subdir:
ano_folder = True
elif "construct_data" in subdir:
ano_folder = False
else:
continue # Skip if it's neither of those folders
match = date_pattern.match(os.path.basename(subdir))
if match:
date_str = match.group() # Extract the matched date string
#Log
log_path = os.path.join(subdir, "log")
if os.path.exists(log_path):
self.load_log(log_path, date_str, log_queries, ano_folder)
#Metrics
metric_path = os.path.join(subdir, "metric")
if os.path.exists(metric_path):
self.load_metric(metric_path, date_str, metric_queries, ano_folder)
#Traces
trace_path = os.path.join(subdir, "trace")
if os.path.exists(trace_path):
self.load_trace(trace_path, date_str, trace_queries, ano_folder)
#Labels
label_file = os.path.join(subdir, f"{date_str}-fault_list.json")
if os.path.exists(label_file):
#self.process_label(label_path, date_str, label_queries)
label_df = self.load_label(label_file, date_str)
if label_df is not None:
all_label_df = pl.concat([all_label_df, label_df])
#RCAs per service
#Hipster
rca_hip_file = os.path.join(subdir, f"root_cause_hipster.json")
if os.path.exists(rca_hip_file):
self.df_rca_hip = self.load_rca(rca_hip_file)
#TrainTicket
rca_ts_file = os.path.join(subdir, f"root_cause_ts.json")
if os.path.exists(rca_ts_file):
self.df_rca_ts = self.load_rca(rca_ts_file)
self.df_label = all_label_df
#Collect files that were read with lazy_frame
#Collect logs
dataframes = pl.collect_all(log_queries)
self.df = pl.concat(dataframes)
self.df = self.df.rename({"Log":"raw_m_message"})
#Collect traces
dataframes = pl.collect_all(trace_queries)
self.df_trace = pl.concat(dataframes)
# Collect metrics
for group, queries in metric_queries.items():
if queries:
try:
dataframes = pl.collect_all(queries)
if dataframes:
# Standardize column order based on the first DataFrame
reference_columns = dataframes[0].columns
standardized_dfs = [df.select(reference_columns) for df in dataframes]
df = pl.concat(standardized_dfs)
#Metrics are set here
setattr(self, f'df_metric_{group}', df)
except pl.exceptions.ShapeError as e:
print(f"Error concatenating group '{group}': {e}")
# Debugging: print out column names for each DataFrame in the group
for q in queries:
collected_df = q.collect()
print(f"Columns in {group}: {collected_df.columns}")
raise