def analyze_metrics()

in ssiog/metrics_collector.py [0:0]


def analyze_metrics(path, timestamp_filter=True):
    """
    Analyzes metrics from CSV files in a Google Cloud Storage bucket or local filesystem.

    Args:
        path (str): The path to the bucket or local containing CSV files, e.g., "gs://my-bucket/path/to/files/*.csv", "/local/*.csv"

    Returns:
        A pandas DataFrame containing the combined latency data, or None if no files are found. Also, timebased filtering which selects
        common entry among all the CSV files if timestamp_filter is set to True.
    """
    try:
        if path.startswith("gs://"): # if gcs path
            fs = gcsfs.GCSFileSystem()
        else: # otherwise assume it a local path
            fs = fsspec.filesystem("local")
        
        # Find all CSV files in the path using glob-like pattern matching.
        csv_files = list(fs.glob(path))
        if not csv_files:
            return None
        
        logger.info(f"Total number of CSV files: {len(csv_files)}")
        systemMemory = get_system_memory()
        logger.info(f"Total system memory: {systemMemory[0]} MiB" )
        logger.info(f"Used system memory: {systemMemory[1]} MiB")
        logger.info(f"Free system memory: {systemMemory[2]} MiB")
        logger.info(f"Memory usage by process before loading CSV files: {get_memory_usage()} MiB")    

        with ThreadPoolExecutor() as pool:
            results = list(tqdm(pool.map(lambda file: process_csv(file, fs), csv_files), total=len(csv_files)))
            
        start_timestamps = []
        end_timestamps = []        
        all_data = []
        for start, end, df in results:
            if start is not None and end is not None:
                start_timestamps.append(start)
                end_timestamps.append(end)
            all_data.append(df)
        
        combined_df = pd.concat(all_data)    
        logger.info(f"Memory usage by process after loading CSV files: {get_memory_usage()} MiB")    
        
        if not start_timestamps or not end_timestamps:
            return None

        min_timestamp = max(start_timestamps)
        max_timestamp = min(end_timestamps)
        
        # Filter which is not recorded b/w min_timestamp and max_timestamp
        if timestamp_filter:
            combined_df['timestamp'] = pd.to_datetime(combined_df['timestamp'], unit='s')
            combined_df = combined_df[(combined_df['timestamp'] >= pd.to_datetime(min_timestamp, unit='s')) & (combined_df['timestamp'] <= pd.to_datetime(max_timestamp, unit='s'))]
        
        if combined_df.empty:
            return None
    
        return combined_df
    
    except Exception as e:
        logger.error(f"Error in analyzing metrics: {e}")
        return None