def process_files_optimized()

in read_stall_retry/retry_count_vs_request_count.py [0:0]


def process_files_optimized(file_pattern, output_file, num_workers=4):
    """Processes log files in parallel and aggregates retry counts."""
    try:
        if file_pattern.startswith("gs://"):
            fs = gcsfs.GCSFileSystem()
        else:
            fs = fsspec.filesystem("local")

        file_list = fs.glob(file_pattern)

        if not file_list:
            logger.warning(f"No files found matching pattern: {file_pattern}")
            return

        aggregated_counts = defaultdict(int)
        futures = []

        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            for file_path in file_list:
                futures.append(executor.submit(process_file, file_path, fs, aggregated_counts))

            for future in as_completed(futures):
                try:
                    future.result()  # Check for exceptions in threads
                except Exception as e:
                    logger.error(f"Error in thread: {e}")

        frequency_counts = defaultdict(int)
        for count in aggregated_counts.values():
            frequency_counts[count] += 1

        frequency_counts_df = pd.DataFrame(list(frequency_counts.items()), columns=['retry_count', 'num_requests_with_that_retry_count'])
        frequency_counts_df.to_csv(output_file, index=False)
        logger.info(f"Results saved to '{output_file}'.")

    except Exception as e:
        logger.error(f"Error in processing files: {e}")