def read_filtered_parquet_files()

in rawdataset/filter-yt-commons.py [0:0]


def read_filtered_parquet_files(folder_path, fields, filters=None):
    """
    Reads specified fields from all Parquet files in a folder with filtering and combines them into a single DataFrame.
    
    Parameters:
    folder_path (str): The path to the folder containing Parquet files.
    fields (list): List of fields to read from the Parquet files.
    filters (list): List of tuples for filtering, e.g., [('column_name', '==', value)]
    
    Returns:
    pd.DataFrame: A DataFrame containing the specified fields from all filtered Parquet files.
    """
    # List to store DataFrames
    dataframes = []
    
    # Iterate over all files in the folder
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.parquet'):
            file_path = os.path.join(folder_path, file_name)
            print(f"Processing file: {file_path}")
            
            # Read the entire Parquet file
            df = pq.read_table(file_path).to_pandas()
            
            # Apply filters if provided
            if filters:
                for column, operator, value in filters:
                    if operator == '==':
                        df = df[df[column] == value]
                    elif operator == '>':
                        df = df[df[column] > value]
                    elif operator == '<':
                        df = df[df[column] < value]
                    # Add other operators as needed
            
            # Check if 'word_count' column exists and filter rows with word_count > 50
            if 'word_count' in df.columns:
                df = df[df['word_count'] > 50]
                
            # Handle 'source_language' and 'language_id_method' fields
            if 'source_language' not in df.columns and 'language_id_method' in df.columns:
                df['source_language'] = df['language_id_method']
            elif 'source_language' in df.columns:
                pass  # 'source_language' already exists, no action needed
            
            # Ensure 'source_language' is in the fields to select
            if 'source_language' not in fields:
                fields.append('source_language')
                
            # Select only the specified fields
            df = df[fields]
            dataframes.append(df)
    
    # Concatenate all DataFrames
    combined_df = pd.concat(dataframes, ignore_index=True)
    return combined_df