in tooling/enrichment/consolidate_results.py [0:0]
def fetch_processed_products_from_firestore():
"""Fetch all successfully processed products from Firestore."""
db = firestore.Client(project=PROJECT_ID)
collection = db.collection(FIRESTORE_COLLECTION)
print("Fetching processed products from Firestore...")
# Query only completed products
docs = collection.where(filter=firestore.FieldFilter('status', '==', 'completed')).stream()
processed_products = []
for doc in docs:
try:
product_id = int(doc.id) # Convert string ID to int
data = doc.to_dict()
print(f"\nDebug - Firestore document for product {product_id}:")
print(f"Data: {data}")
# Extract data from the nested product_data if it exists
product_data = data.get('product_data', {})
# Create base product info
product_info = {
'id': product_id,
'image_uri': data.get('image_uri'),
'description': data.get('description'),
'completed_at': data.get('completed_at'),
'status': data.get('status'),
'started_at': data.get('started_at'),
'updated_at': data.get('updated_at')
}
# Add all product_data fields except 'id' which we already have
if product_data:
product_data.pop('id', None) # Remove id from product_data if it exists
product_info.update(product_data)
processed_products.append(product_info)
except Exception as e:
print(f"Error processing document {doc.id}: {str(e)}")
continue
print(f"\nFetched {len(processed_products)} processed products from Firestore")
if processed_products:
print("\nSample of first processed product:")
print(processed_products[0])
return pd.DataFrame(processed_products) if processed_products else pd.DataFrame(columns=['id', 'image_uri', 'description', 'completed_at', 'status', 'started_at', 'updated_at'])