def analyze_metrics()

in read_stall_retry/latency_per_time_gap.py [0:0]


def analyze_metrics(path: str, percentiles_to_calculate: List[float], time_gap_minutes: int) -> Optional[pd.DataFrame]:
    try:
        if path.startswith("gs://"):
            fs = gcsfs.GCSFileSystem()
        else:
            fs = fsspec.filesystem("local")

        csv_files = list(fs.glob(path))
        if not csv_files:
            logger.warning(f"No files found at {path}")
            return None

        logger.info(f"Total number of CSV files: {len(csv_files)}")
        total_mem, used_mem, free_mem = get_system_memory()
        logger.info(f"Total system memory: {total_mem:.2f} MiB, Used: {used_mem:.2f} MiB, Free: {free_mem:.2f} MiB")
        logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage():.2f} MiB")

        results = []
        with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
            futures = [executor.submit(process_csv, file, fs) for file in csv_files]
            for future in tqdm(as_completed(futures), total=len(csv_files)):
                results.append(future.result())

        time_gap_str = f'{time_gap_minutes}T'
        time_data = defaultdict(list)

        for df in results:
            if not df.empty:
                df['time_gap'] = df['Timestamp'].dt.floor(time_gap_str)
                for gap, group in df.groupby('time_gap'):
                    time_data[gap].extend(group['Overall Latency'].values)

        processed_metrics = []
        for gap, latencies in time_data.items():
            if latencies:
                percentiles = calculate_percentiles(latencies, percentiles_to_calculate)
                metric_row = {'time': gap.strftime('%H:%M'), 'min': percentiles['min']}
                metric_row.update({f'p{p}': percentiles[f'p{p}'] for p in percentiles_to_calculate})
                metric_row['max'] = percentiles['max']
                processed_metrics.append(metric_row)

        result_df = pd.DataFrame(processed_metrics)

        logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage():.2f} MiB")

        if result_df.empty:
            logger.warning("No data to return.")
            return None

        return result_df

    except Exception as e:
        logger.error(f"Error in analyze_metrics: {e}")
        return None