def consolidate_results()

in tooling/enrichment/consolidate_results.py [0:0]


def consolidate_results():
    """Consolidate results from BigQuery and Firestore."""
    # Fetch all data
    bq_df = fetch_all_products_from_bigquery()
    processed_df = fetch_processed_products_from_firestore()
    failed_df = fetch_failed_products_from_firestore()
    
    print("\nDebug - Processed DataFrame Info:")
    print(processed_df.info())
    if not processed_df.empty:
        print("\nSample of processed data:")
        print(processed_df.head())
    
    # Initialize merged DataFrame with BigQuery data
    merged_df = bq_df.copy()
    
    # Add columns for processing data with default values
    merged_df['image_uri'] = None
    merged_df['description'] = None
    merged_df['completed_at'] = None
    merged_df['error_message'] = None
    merged_df['retry_count'] = 0
    merged_df['failed_at'] = None
    merged_df['started_at'] = None
    merged_df['updated_at'] = None
    merged_df['processing_status'] = 'pending'
    
    # Update processed products if any exist
    if not processed_df.empty:
        print("\nUpdating processed products...")
        processed_columns = ['image_uri', 'description', 'completed_at', 'started_at', 'updated_at']
        
        for _, row in processed_df.iterrows():
            product_id = row['id']
            mask = merged_df['id'] == product_id
            print(f"\nUpdating product {product_id}:")
            print(f"Original data: {merged_df.loc[mask, processed_columns].to_dict('records')}")
            print(f"New data: {row[processed_columns].to_dict()}")
            
            # Update all relevant columns
            for col in processed_columns:
                if col in row and pd.notna(row[col]):
                    merged_df.loc[mask, col] = row[col]
            
            merged_df.loc[mask, 'processing_status'] = 'completed'
    
    # Update failed products if any exist
    if not failed_df.empty:
        print("\nUpdating failed products...")
        failed_columns = ['error_message', 'retry_count', 'failed_at', 'started_at', 'updated_at']
        
        for _, row in failed_df.iterrows():
            product_id = row['id']
            mask = merged_df['id'] == product_id
            print(f"\nUpdating failed product {product_id}:")
            print(f"Original data: {merged_df.loc[mask, failed_columns].to_dict('records')}")
            print(f"New data: {row[failed_columns].to_dict()}")
            
            # Update all relevant columns
            for col in failed_columns:
                if col in row and pd.notna(row[col]):
                    merged_df.loc[mask, col] = row[col]
            
            merged_df.loc[mask, 'processing_status'] = 'failed'
    
    # Print sample of final data
    print("\nSample of final merged data:")
    sample_completed = merged_df[merged_df['processing_status'] == 'completed'].head()
    if not sample_completed.empty:
        print("\nCompleted products sample:")
        display_columns = ['id', 'name', 'brand', 'image_uri', 'description', 'completed_at', 'processing_status']
        print(sample_completed[display_columns])
    
    # Generate timestamp for filename
    timestamp = int(time.time())
    filename = f"consolidated_products_{timestamp}.csv"
    
    # Upload to GCS
    upload_to_gcs(merged_df, filename)
    
    # Write to BigQuery
    write_to_bigquery(merged_df, ENRICHED_TABLE)
    
    # Print summary
    print("\nProcessing Summary:")
    print(f"Total Products: {len(merged_df)}")
    print(f"Completed: {len(merged_df[merged_df['processing_status'] == 'completed'])}")
    print(f"Failed: {len(merged_df[merged_df['processing_status'] == 'failed'])}")
    print(f"Pending: {len(merged_df[merged_df['processing_status'] == 'pending'])}")