def load()

in aiops/MicroAgents/loader/nezha.py [0:0]


    def load(self):
        log_queries = []
        metric_queries = {
            'src_dst': [],
            'front_srv': [],
            'dep': [],
            'default': []
        }
        trace_queries = []
        label_queries = []
        all_label_df = pl.DataFrame()
        date_pattern = re.compile(r'\d{4}-\d{2}-\d{2}')  # Regular expression for 'dddd-dd-dd'

        # Iterate over all subdirectories in the base directory
        for subdir, dirs, files in os.walk(self.filename):
            # Check if the current subdir is within rca_data or construct_data
            if "rca_data" in subdir:
                ano_folder = True
            elif "construct_data" in subdir:
                ano_folder = False
            else:
                continue  # Skip if it's neither of those folders
            match = date_pattern.match(os.path.basename(subdir))
            if match:
                date_str = match.group()  # Extract the matched date string
                #Log
                log_path = os.path.join(subdir, "log")
                if os.path.exists(log_path):
                    self.load_log(log_path, date_str, log_queries, ano_folder)
                #Metrics
                metric_path = os.path.join(subdir, "metric")
                if os.path.exists(metric_path):
                    self.load_metric(metric_path, date_str, metric_queries, ano_folder)
                #Traces
                trace_path = os.path.join(subdir, "trace")
                if os.path.exists(trace_path):
                    self.load_trace(trace_path, date_str, trace_queries, ano_folder)
                #Labels
                label_file = os.path.join(subdir, f"{date_str}-fault_list.json")
                if os.path.exists(label_file):
                    #self.process_label(label_path, date_str, label_queries)
                    label_df = self.load_label(label_file, date_str)
                    if label_df is not None:
                        all_label_df = pl.concat([all_label_df, label_df])
            #RCAs per service
            #Hipster
            rca_hip_file = os.path.join(subdir, f"root_cause_hipster.json")
            if os.path.exists(rca_hip_file):
                self.df_rca_hip = self.load_rca(rca_hip_file)
            #TrainTicket
            rca_ts_file = os.path.join(subdir, f"root_cause_ts.json")
            if os.path.exists(rca_ts_file):
                self.df_rca_ts = self.load_rca(rca_ts_file)


        self.df_label = all_label_df
        #Collect files that were read with lazy_frame
        #Collect logs
        dataframes = pl.collect_all(log_queries)
        self.df = pl.concat(dataframes)
        self.df = self.df.rename({"Log":"raw_m_message"})
        #Collect traces
        dataframes = pl.collect_all(trace_queries)
        self.df_trace = pl.concat(dataframes)
        # Collect metrics
        for group, queries in metric_queries.items():
            if queries:
                try:
                    dataframes = pl.collect_all(queries)
                    if dataframes:
                        # Standardize column order based on the first DataFrame
                        reference_columns = dataframes[0].columns
                        standardized_dfs = [df.select(reference_columns) for df in dataframes]
                        df = pl.concat(standardized_dfs)
                        #Metrics are set here
                        setattr(self, f'df_metric_{group}', df)
                except pl.exceptions.ShapeError as e:
                    print(f"Error concatenating group '{group}': {e}")
                    # Debugging: print out column names for each DataFrame in the group
                    for q in queries:
                        collected_df = q.collect()
                        print(f"Columns in {group}: {collected_df.columns}")
                    raise