def fetch_processed_products_from_firestore()

in tooling/enrichment/consolidate_results.py [0:0]


def fetch_processed_products_from_firestore():
    """Fetch all successfully processed products from Firestore."""
    db = firestore.Client(project=PROJECT_ID)
    collection = db.collection(FIRESTORE_COLLECTION)
    
    print("Fetching processed products from Firestore...")
    # Query only completed products
    docs = collection.where(filter=firestore.FieldFilter('status', '==', 'completed')).stream()
    
    processed_products = []
    for doc in docs:
        try:
            product_id = int(doc.id)  # Convert string ID to int
            data = doc.to_dict()
            print(f"\nDebug - Firestore document for product {product_id}:")
            print(f"Data: {data}")
            
            # Extract data from the nested product_data if it exists
            product_data = data.get('product_data', {})
            
            # Create base product info
            product_info = {
                'id': product_id,
                'image_uri': data.get('image_uri'),
                'description': data.get('description'),
                'completed_at': data.get('completed_at'),
                'status': data.get('status'),
                'started_at': data.get('started_at'),
                'updated_at': data.get('updated_at')
            }
            
            # Add all product_data fields except 'id' which we already have
            if product_data:
                product_data.pop('id', None)  # Remove id from product_data if it exists
                product_info.update(product_data)
            
            processed_products.append(product_info)
        except Exception as e:
            print(f"Error processing document {doc.id}: {str(e)}")
            continue
    
    print(f"\nFetched {len(processed_products)} processed products from Firestore")
    if processed_products:
        print("\nSample of first processed product:")
        print(processed_products[0])
    
    return pd.DataFrame(processed_products) if processed_products else pd.DataFrame(columns=['id', 'image_uri', 'description', 'completed_at', 'status', 'started_at', 'updated_at'])