ssiog/metrics_collector.py (87 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
import fsspec
import gcsfs
import argparse
import logging
import os
import psutil
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
# Initialize the global logger with basic INFO level log.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s')
logger = logging.getLogger(__name__)
def convert_bytes_to_mib(bytes):
return bytes / (1024 ** 2)
def get_system_memory():
mem = psutil.virtual_memory()
return convert_bytes_to_mib(mem.total), convert_bytes_to_mib(mem.used), convert_bytes_to_mib(mem.free)
def get_memory_usage():
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return convert_bytes_to_mib(mem_info.rss)
def process_csv(file, fs):
with fs.open(file, 'r') as f:
df = pd.read_csv(f)
if not df.empty:
return df['timestamp'][0], df['timestamp'][len(df['timestamp']) - 1], df
else:
return None, None, df
def analyze_metrics(path, timestamp_filter=True):
"""
Analyzes metrics from CSV files in a Google Cloud Storage bucket or local filesystem.
Args:
path (str): The path to the bucket or local containing CSV files, e.g., "gs://my-bucket/path/to/files/*.csv", "/local/*.csv"
Returns:
A pandas DataFrame containing the combined latency data, or None if no files are found. Also, timebased filtering which selects
common entry among all the CSV files if timestamp_filter is set to True.
"""
try:
if path.startswith("gs://"): # if gcs path
fs = gcsfs.GCSFileSystem()
else: # otherwise assume it a local path
fs = fsspec.filesystem("local")
# Find all CSV files in the path using glob-like pattern matching.
csv_files = list(fs.glob(path))
if not csv_files:
return None
logger.info(f"Total number of CSV files: {len(csv_files)}")
systemMemory = get_system_memory()
logger.info(f"Total system memory: {systemMemory[0]} MiB" )
logger.info(f"Used system memory: {systemMemory[1]} MiB")
logger.info(f"Free system memory: {systemMemory[2]} MiB")
logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage()} MiB")
with ThreadPoolExecutor() as pool:
results = list(tqdm(pool.map(lambda file: process_csv(file, fs), csv_files), total=len(csv_files)))
start_timestamps = []
end_timestamps = []
all_data = []
for start, end, df in results:
if start is not None and end is not None:
start_timestamps.append(start)
end_timestamps.append(end)
all_data.append(df)
combined_df = pd.concat(all_data)
logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage()} MiB")
if not start_timestamps or not end_timestamps:
return None
min_timestamp = max(start_timestamps)
max_timestamp = min(end_timestamps)
# Filter which is not recorded b/w min_timestamp and max_timestamp
if timestamp_filter:
combined_df['timestamp'] = pd.to_datetime(combined_df['timestamp'], unit='s')
combined_df = combined_df[(combined_df['timestamp'] >= pd.to_datetime(min_timestamp, unit='s')) & (combined_df['timestamp'] <= pd.to_datetime(max_timestamp, unit='s'))]
if combined_df.empty:
return None
return combined_df
except Exception as e:
logger.error(f"Error in analyzing metrics: {e}")
return None
def parse_args():
parser = argparse.ArgumentParser(description="Analyze metrics from GCS")
parser.add_argument(
"--metrics-path",
type=str,
help="GCS or local path to metrics files",
default="gs://princer-ssiog-data-bkt-uc1/test_0_7_0-0/ssiog-training-n69qj/*.csv"
)
parser.add_argument(
"--timestamp-filter",
action="store_true",
help="Filter by common timestamps")
return parser.parse_args()
# Create a main executor which provides a hardcoded path to analyze the metrics create a main method instead
def main():
args = parse_args()
result_df = analyze_metrics(args.metrics_path, args.timestamp_filter)
if result_df is not None:
print(result_df['sample_lat'].describe(percentiles=[0.05, 0.1, 0.25, 0.5, 0.9, 0.99, 0.999, 0.9999, 0.99999, 0.999999, 0.9999999]))
if __name__ == "__main__":
main()