read_stall_retry/retry_count_vs_request_count.py (82 lines of code) (raw):

#!/usr/bin/env python3 # Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pandas as pd import logging import time import argparse import fsspec import gcsfs import re from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed import os # Initialize logger logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Compile regex LOG_PATTERN = re.compile(r'\[(.*?)\] stalled read-req for object \((.*?)\) cancelled after') def process_file(file_path, fs, aggregated_counts): """Processes a single log file and aggregates UUID counts.""" try: with fs.open(file_path, 'r') as file: for line in file: match = LOG_PATTERN.search(line) if match: uuid = match.group(1) if uuid: aggregated_counts[uuid] += 1 except FileNotFoundError: logger.error(f"File not found: {file_path}") except Exception as e: logger.error(f"Error processing file {file_path}: {e}") def process_files_optimized(file_pattern, output_file, num_workers=4): """Processes log files in parallel and aggregates retry counts.""" try: if file_pattern.startswith("gs://"): fs = gcsfs.GCSFileSystem() else: fs = fsspec.filesystem("local") file_list = fs.glob(file_pattern) if not file_list: logger.warning(f"No files found matching pattern: {file_pattern}") return aggregated_counts = defaultdict(int) futures = [] with ThreadPoolExecutor(max_workers=num_workers) as executor: for file_path in file_list: futures.append(executor.submit(process_file, file_path, fs, aggregated_counts)) for future in as_completed(futures): try: future.result() # Check for exceptions in threads except Exception as e: logger.error(f"Error in thread: {e}") frequency_counts = defaultdict(int) for count in aggregated_counts.values(): frequency_counts[count] += 1 frequency_counts_df = pd.DataFrame(list(frequency_counts.items()), columns=['retry_count', 'num_requests_with_that_retry_count']) frequency_counts_df.to_csv(output_file, index=False) logger.info(f"Results saved to '{output_file}'.") except Exception as e: logger.error(f"Error in processing files: {e}") def parse_args(): """Parses command-line arguments.""" parser = argparse.ArgumentParser(description="Analyze logs and count retries per request.") parser.add_argument( "--logs-path", type=str, required=True, help="Path to the logs (GCS or local path with wildcards) to analyze." ) parser.add_argument( "--output-file", type=str, default="retry_count_vs_request_count.csv", help="Output CSV file to save the results." ) parser.add_argument( "--workers", type=int, default=min(32, os.cpu_count() or 1), #Adjusting workers number. help="Number of worker threads for parallel processing." ) return parser.parse_args() def main(): """Main function to execute the script.""" args = parse_args() main_start = time.time() process_files_optimized(args.logs_path, args.output_file, args.workers) logger.info(f"Total execution time: {time.time() - main_start:.2f} seconds") if __name__ == "__main__": main()