def analyze_metrics()

in read_stall_retry/metrics_collector.py [0:0]


def analyze_metrics(path: str, timestamp_filter: bool = True) -> Optional[pd.DataFrame]:
    """Analyzes metrics from CSV files in a GCS bucket or local filesystem."""
    try:
        if path.startswith("gs://"):
            fs = gcsfs.GCSFileSystem()
        else:
            fs = fsspec.filesystem("local")

        csv_files = list(fs.glob(path))
        if not csv_files:
            logger.warning(f"No CSV files found at {path}")
            return None

        logger.info(f"Total number of CSV files: {len(csv_files)}")
        total_mem, used_mem, free_mem = get_system_memory()
        logger.info(f"Total system memory: {total_mem:.2f} MiB, Used: {used_mem:.2f} MiB, Free: {free_mem:.2f} MiB")
        logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage():.2f} MiB")

        results = []
        with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
            futures = [executor.submit(process_csv, file, fs) for file in csv_files]
            for future in tqdm(as_completed(futures), total=len(csv_files)):
                results.append(future.result())

        start_timestamps = []
        end_timestamps = []
        all_data = []
        for start, end, df in results:
            if start is not None and end is not None:
                start_timestamps.append(start)
                end_timestamps.append(end)
            all_data.append(df)

        combined_df = pd.concat(all_data)
        logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage():.2f} MiB")

        if not start_timestamps or not end_timestamps:
            logger.warning("No valid timestamps found.")
            return None

        min_timestamp = max(start_timestamps)
        max_timestamp = min(end_timestamps)

        if timestamp_filter:
            combined_df['Timestamp'] = pd.to_datetime(combined_df['Timestamp'], unit='s')
            combined_df = combined_df[
                (combined_df['Timestamp'] >= pd.to_datetime(min_timestamp, unit='s')) &
                (combined_df['Timestamp'] <= pd.to_datetime(max_timestamp, unit='s'))
            ]

        if combined_df.empty:
            logger.warning("No data remains after timestamp filtering.")
            return None

        return combined_df

    except Exception as e:
        logger.error(f"Error in analyze_metrics: {e}")
        return None