in tooling/enrichment/consolidate_results.py [0:0]
def fetch_failed_products_from_firestore():
"""Fetch all permanently failed products from Firestore."""
db = firestore.Client(project=PROJECT_ID)
collection = db.collection(FIRESTORE_COLLECTION)
print("\nFetching failed products from Firestore...")
# Query permanently failed products
docs = collection.where(filter=firestore.FieldFilter('status', '==', 'permanently_failed')).stream()
failed_products = []
for doc in docs:
try:
product_id = int(doc.id)
data = doc.to_dict()
product_data = data.get('product_data', {})
# Create base product info
product_info = {
'id': product_id,
'error_message': data.get('error_message'),
'retry_count': data.get('retry_count'),
'failed_at': data.get('failed_at'),
'status': data.get('status'),
'started_at': data.get('started_at'),
'updated_at': data.get('updated_at')
}
# Add all product_data fields except 'id'
if product_data:
product_data.pop('id', None) # Remove id from product_data if it exists
product_info.update(product_data)
failed_products.append(product_info)
except Exception as e:
print(f"Error processing document {doc.id}: {str(e)}")
continue
print(f"Fetched {len(failed_products)} failed products from Firestore")
return pd.DataFrame(failed_products) if failed_products else pd.DataFrame(columns=['id', 'error_message', 'retry_count', 'failed_at', 'status', 'started_at', 'updated_at'])