in read_stall_retry/retry_count_vs_request_count.py [0:0]
def process_files_optimized(file_pattern, output_file, num_workers=4):
"""Processes log files in parallel and aggregates retry counts."""
try:
if file_pattern.startswith("gs://"):
fs = gcsfs.GCSFileSystem()
else:
fs = fsspec.filesystem("local")
file_list = fs.glob(file_pattern)
if not file_list:
logger.warning(f"No files found matching pattern: {file_pattern}")
return
aggregated_counts = defaultdict(int)
futures = []
with ThreadPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_list:
futures.append(executor.submit(process_file, file_path, fs, aggregated_counts))
for future in as_completed(futures):
try:
future.result() # Check for exceptions in threads
except Exception as e:
logger.error(f"Error in thread: {e}")
frequency_counts = defaultdict(int)
for count in aggregated_counts.values():
frequency_counts[count] += 1
frequency_counts_df = pd.DataFrame(list(frequency_counts.items()), columns=['retry_count', 'num_requests_with_that_retry_count'])
frequency_counts_df.to_csv(output_file, index=False)
logger.info(f"Results saved to '{output_file}'.")
except Exception as e:
logger.error(f"Error in processing files: {e}")