in ssiog/metrics_per_minute_collector.py [0:0]
def analyze_metrics(path, timestamp_filter=True):
"""
Analyzes metrics from CSV files in a Google Cloud Storage bucket or local filesystem.
"""
try:
if path.startswith("gs://"): # if GCS path
fs = gcsfs.GCSFileSystem()
else: # otherwise assume it's a local path
fs = fsspec.filesystem("local")
# Find all CSV files in the path using glob-like pattern matching.
csv_files = list(fs.glob(path))
if not csv_files:
return None
logger.info(f"Total number of CSV files: {len(csv_files)}")
systemMemory = get_system_memory()
logger.info(f"Total system memory: {systemMemory[0]} MiB")
logger.info(f"Used system memory: {systemMemory[1]} MiB")
logger.info(f"Free system memory: {systemMemory[2]} MiB")
logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage()} MiB")
with ThreadPoolExecutor() as pool:
results = list(tqdm(pool.map(lambda file: process_csv(file, fs), csv_files), total=len(csv_files)))
# Initialize a dictionary to store latency data per minute
minute_data = defaultdict(list)
start_timestamps = []
end_timestamps = []
# Process data
for df in results:
if not df.empty:
# Round timestamp to the start of the minute (i.e., remove seconds and microseconds)
df['minute'] = df['timestamp'].dt.floor('min') # Update to 'min' instead of 'T'
for minute, group in df.groupby('minute'):
minute_data[minute].extend(group['sample_lat'].values)
# Process per-minute latency data
processed_metrics = []
for minute, latencies in minute_data.items():
if latencies:
percentiles = calculate_percentiles(latencies)
processed_metrics.append({
'time': minute.strftime('%H:%M'),
'min': percentiles['min'], # Min comes first
'p90': percentiles['p90'],
'p99': percentiles['p99'],
'p99.9': percentiles['p99.9'],
'p99.99': percentiles['p99.99'],
'p99.999': percentiles['p99.999'],
'p99.9999': percentiles['p99.9999'],
'p99.99999': percentiles['p99.99999'],
'max': percentiles['max']
})
# Convert processed metrics into a DataFrame
result_df = pd.DataFrame(processed_metrics)
logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage()} MiB")
if result_df.empty:
return None
return result_df
except Exception as e:
logger.error(f"Error in analyzing metrics: {e}")
return None