in read_stall_retry/latency_per_time_gap.py [0:0]
def analyze_metrics(path: str, percentiles_to_calculate: List[float], time_gap_minutes: int) -> Optional[pd.DataFrame]:
try:
if path.startswith("gs://"):
fs = gcsfs.GCSFileSystem()
else:
fs = fsspec.filesystem("local")
csv_files = list(fs.glob(path))
if not csv_files:
logger.warning(f"No files found at {path}")
return None
logger.info(f"Total number of CSV files: {len(csv_files)}")
total_mem, used_mem, free_mem = get_system_memory()
logger.info(f"Total system memory: {total_mem:.2f} MiB, Used: {used_mem:.2f} MiB, Free: {free_mem:.2f} MiB")
logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage():.2f} MiB")
results = []
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
futures = [executor.submit(process_csv, file, fs) for file in csv_files]
for future in tqdm(as_completed(futures), total=len(csv_files)):
results.append(future.result())
time_gap_str = f'{time_gap_minutes}T'
time_data = defaultdict(list)
for df in results:
if not df.empty:
df['time_gap'] = df['Timestamp'].dt.floor(time_gap_str)
for gap, group in df.groupby('time_gap'):
time_data[gap].extend(group['Overall Latency'].values)
processed_metrics = []
for gap, latencies in time_data.items():
if latencies:
percentiles = calculate_percentiles(latencies, percentiles_to_calculate)
metric_row = {'time': gap.strftime('%H:%M'), 'min': percentiles['min']}
metric_row.update({f'p{p}': percentiles[f'p{p}'] for p in percentiles_to_calculate})
metric_row['max'] = percentiles['max']
processed_metrics.append(metric_row)
result_df = pd.DataFrame(processed_metrics)
logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage():.2f} MiB")
if result_df.empty:
logger.warning("No data to return.")
return None
return result_df
except Exception as e:
logger.error(f"Error in analyze_metrics: {e}")
return None