def analyze_metrics()

in ssiog/metrics_per_minute_collector.py [0:0]


def analyze_metrics(path, timestamp_filter=True):
    """
    Analyzes metrics from CSV files in a Google Cloud Storage bucket or local filesystem.
    """

    try:
        if path.startswith("gs://"):  # if GCS path
            fs = gcsfs.GCSFileSystem()
        else:  # otherwise assume it's a local path
            fs = fsspec.filesystem("local")
        
        # Find all CSV files in the path using glob-like pattern matching.
        csv_files = list(fs.glob(path))
        if not csv_files:
            return None
        
        logger.info(f"Total number of CSV files: {len(csv_files)}")
        systemMemory = get_system_memory()
        logger.info(f"Total system memory: {systemMemory[0]} MiB")
        logger.info(f"Used system memory: {systemMemory[1]} MiB")
        logger.info(f"Free system memory: {systemMemory[2]} MiB")
        logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage()} MiB")    

        with ThreadPoolExecutor() as pool:
            results = list(tqdm(pool.map(lambda file: process_csv(file, fs), csv_files), total=len(csv_files)))
        
        # Initialize a dictionary to store latency data per minute
        minute_data = defaultdict(list)
        start_timestamps = []
        end_timestamps = []

        # Process data
        for df in results:
            if not df.empty:
                # Round timestamp to the start of the minute (i.e., remove seconds and microseconds)
                df['minute'] = df['timestamp'].dt.floor('min')  # Update to 'min' instead of 'T'
                for minute, group in df.groupby('minute'):
                    minute_data[minute].extend(group['sample_lat'].values)

        # Process per-minute latency data
        processed_metrics = []
        for minute, latencies in minute_data.items():
            if latencies:
                percentiles = calculate_percentiles(latencies)
                processed_metrics.append({
                    'time': minute.strftime('%H:%M'),
                    'min': percentiles['min'],  # Min comes first
                    'p90': percentiles['p90'],
                    'p99': percentiles['p99'],
                    'p99.9': percentiles['p99.9'],
                    'p99.99': percentiles['p99.99'],
                    'p99.999': percentiles['p99.999'],
                    'p99.9999': percentiles['p99.9999'],
                    'p99.99999': percentiles['p99.99999'],
                    'max': percentiles['max']
                })

        # Convert processed metrics into a DataFrame
        result_df = pd.DataFrame(processed_metrics)
        
        logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage()} MiB")
        
        if result_df.empty:
            return None

        return result_df
    
    except Exception as e:
        logger.error(f"Error in analyzing metrics: {e}")
        return None