in ssiog/metrics_collector.py [0:0]
def analyze_metrics(path, timestamp_filter=True):
"""
Analyzes metrics from CSV files in a Google Cloud Storage bucket or local filesystem.
Args:
path (str): The path to the bucket or local containing CSV files, e.g., "gs://my-bucket/path/to/files/*.csv", "/local/*.csv"
Returns:
A pandas DataFrame containing the combined latency data, or None if no files are found. Also, timebased filtering which selects
common entry among all the CSV files if timestamp_filter is set to True.
"""
try:
if path.startswith("gs://"): # if gcs path
fs = gcsfs.GCSFileSystem()
else: # otherwise assume it a local path
fs = fsspec.filesystem("local")
# Find all CSV files in the path using glob-like pattern matching.
csv_files = list(fs.glob(path))
if not csv_files:
return None
logger.info(f"Total number of CSV files: {len(csv_files)}")
systemMemory = get_system_memory()
logger.info(f"Total system memory: {systemMemory[0]} MiB" )
logger.info(f"Used system memory: {systemMemory[1]} MiB")
logger.info(f"Free system memory: {systemMemory[2]} MiB")
logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage()} MiB")
with ThreadPoolExecutor() as pool:
results = list(tqdm(pool.map(lambda file: process_csv(file, fs), csv_files), total=len(csv_files)))
start_timestamps = []
end_timestamps = []
all_data = []
for start, end, df in results:
if start is not None and end is not None:
start_timestamps.append(start)
end_timestamps.append(end)
all_data.append(df)
combined_df = pd.concat(all_data)
logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage()} MiB")
if not start_timestamps or not end_timestamps:
return None
min_timestamp = max(start_timestamps)
max_timestamp = min(end_timestamps)
# Filter which is not recorded b/w min_timestamp and max_timestamp
if timestamp_filter:
combined_df['timestamp'] = pd.to_datetime(combined_df['timestamp'], unit='s')
combined_df = combined_df[(combined_df['timestamp'] >= pd.to_datetime(min_timestamp, unit='s')) & (combined_df['timestamp'] <= pd.to_datetime(max_timestamp, unit='s'))]
if combined_df.empty:
return None
return combined_df
except Exception as e:
logger.error(f"Error in analyzing metrics: {e}")
return None