read_stall_retry/latency_per_time_gap.py (128 lines of code) (raw):

#!/usr/bin/env python3 # Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pandas as pd import fsspec import gcsfs import argparse import logging import os import psutil from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm import numpy as np from collections import defaultdict from typing import List, Dict, Optional # Constants DEFAULT_PERCENTILES = [10, 50, 90, 99, 99.9, 99.99, 99.999, 99.9999, 99.99999] DEFAULT_TIME_GAP = 5 # Initialize the global logger with basic INFO level log. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s') logger = logging.getLogger(__name__) def convert_bytes_to_mib(bytes: int) -> float: return bytes / (1024 ** 2) def get_system_memory() -> tuple[float, float, float]: mem = psutil.virtual_memory() return convert_bytes_to_mib(mem.total), convert_bytes_to_mib(mem.used), convert_bytes_to_mib(mem.free) def get_memory_usage() -> float: process = psutil.Process(os.getpid()) mem_info = process.memory_info() return convert_bytes_to_mib(mem_info.rss) def calculate_percentiles(latencies: List[float], percentiles_to_calculate: List[float]) -> Dict[str, float]: percentiles = { 'min': np.min(latencies), 'max': np.max(latencies) } for p in percentiles_to_calculate: percentiles[f'p{p}'] = np.percentile(latencies, p) return percentiles def process_csv(file: str, fs) -> pd.DataFrame: try: with fs.open(file, 'r') as f: df = pd.read_csv(f) if 'Overall Latency' not in df.columns: logger.warning(f"File {file} does not contain 'Overall Latency' column. Skipping file.") return pd.DataFrame() if not df.empty: df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s') return df else: return pd.DataFrame() except Exception as e: logger.error(f"Error processing file {file}: {e}") return pd.DataFrame() def analyze_metrics(path: str, percentiles_to_calculate: List[float], time_gap_minutes: int) -> Optional[pd.DataFrame]: try: if path.startswith("gs://"): fs = gcsfs.GCSFileSystem() else: fs = fsspec.filesystem("local") csv_files = list(fs.glob(path)) if not csv_files: logger.warning(f"No files found at {path}") return None logger.info(f"Total number of CSV files: {len(csv_files)}") total_mem, used_mem, free_mem = get_system_memory() logger.info(f"Total system memory: {total_mem:.2f} MiB, Used: {used_mem:.2f} MiB, Free: {free_mem:.2f} MiB") logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage():.2f} MiB") results = [] with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: futures = [executor.submit(process_csv, file, fs) for file in csv_files] for future in tqdm(as_completed(futures), total=len(csv_files)): results.append(future.result()) time_gap_str = f'{time_gap_minutes}T' time_data = defaultdict(list) for df in results: if not df.empty: df['time_gap'] = df['Timestamp'].dt.floor(time_gap_str) for gap, group in df.groupby('time_gap'): time_data[gap].extend(group['Overall Latency'].values) processed_metrics = [] for gap, latencies in time_data.items(): if latencies: percentiles = calculate_percentiles(latencies, percentiles_to_calculate) metric_row = {'time': gap.strftime('%H:%M'), 'min': percentiles['min']} metric_row.update({f'p{p}': percentiles[f'p{p}'] for p in percentiles_to_calculate}) metric_row['max'] = percentiles['max'] processed_metrics.append(metric_row) result_df = pd.DataFrame(processed_metrics) logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage():.2f} MiB") if result_df.empty: logger.warning("No data to return.") return None return result_df except Exception as e: logger.error(f"Error in analyze_metrics: {e}") return None def parse_args(): parser = argparse.ArgumentParser(description="Analyze metrics from GCS with configurable time gaps.") parser.add_argument( "--metrics-path", type=str, default="gs://vipinydv-metrics/slowenvironment-readstall-genericread-1byte/*.csv", help="GCS or local path to metrics files." ) parser.add_argument( "--output-file", type=str, default="latency_per_timegap.csv", help="Path to save the processed CSV output." ) parser.add_argument( "--percentiles", type=str, default=",".join(map(str, DEFAULT_PERCENTILES)), help="Comma-separated list of percentiles to calculate." ) parser.add_argument( "--time-gap", type=int, default=DEFAULT_TIME_GAP, help="Time gap in minutes for aggregation." ) return parser.parse_args() def main(): args = parse_args() percentiles_list = [float(p) for p in args.percentiles.split(',')] result_df = analyze_metrics(args.metrics_path, percentiles_list, args.time_gap) if result_df is not None: output_file = args.output_file result_df.to_csv(output_file, index=False) logger.info(f"Results have been saved to {output_file}") if __name__ == "__main__": main()