in tooling/enrichment/consolidate_results.py [0:0]
def consolidate_results():
"""Consolidate results from BigQuery and Firestore."""
# Fetch all data
bq_df = fetch_all_products_from_bigquery()
processed_df = fetch_processed_products_from_firestore()
failed_df = fetch_failed_products_from_firestore()
print("\nDebug - Processed DataFrame Info:")
print(processed_df.info())
if not processed_df.empty:
print("\nSample of processed data:")
print(processed_df.head())
# Initialize merged DataFrame with BigQuery data
merged_df = bq_df.copy()
# Add columns for processing data with default values
merged_df['image_uri'] = None
merged_df['description'] = None
merged_df['completed_at'] = None
merged_df['error_message'] = None
merged_df['retry_count'] = 0
merged_df['failed_at'] = None
merged_df['started_at'] = None
merged_df['updated_at'] = None
merged_df['processing_status'] = 'pending'
# Update processed products if any exist
if not processed_df.empty:
print("\nUpdating processed products...")
processed_columns = ['image_uri', 'description', 'completed_at', 'started_at', 'updated_at']
for _, row in processed_df.iterrows():
product_id = row['id']
mask = merged_df['id'] == product_id
print(f"\nUpdating product {product_id}:")
print(f"Original data: {merged_df.loc[mask, processed_columns].to_dict('records')}")
print(f"New data: {row[processed_columns].to_dict()}")
# Update all relevant columns
for col in processed_columns:
if col in row and pd.notna(row[col]):
merged_df.loc[mask, col] = row[col]
merged_df.loc[mask, 'processing_status'] = 'completed'
# Update failed products if any exist
if not failed_df.empty:
print("\nUpdating failed products...")
failed_columns = ['error_message', 'retry_count', 'failed_at', 'started_at', 'updated_at']
for _, row in failed_df.iterrows():
product_id = row['id']
mask = merged_df['id'] == product_id
print(f"\nUpdating failed product {product_id}:")
print(f"Original data: {merged_df.loc[mask, failed_columns].to_dict('records')}")
print(f"New data: {row[failed_columns].to_dict()}")
# Update all relevant columns
for col in failed_columns:
if col in row and pd.notna(row[col]):
merged_df.loc[mask, col] = row[col]
merged_df.loc[mask, 'processing_status'] = 'failed'
# Print sample of final data
print("\nSample of final merged data:")
sample_completed = merged_df[merged_df['processing_status'] == 'completed'].head()
if not sample_completed.empty:
print("\nCompleted products sample:")
display_columns = ['id', 'name', 'brand', 'image_uri', 'description', 'completed_at', 'processing_status']
print(sample_completed[display_columns])
# Generate timestamp for filename
timestamp = int(time.time())
filename = f"consolidated_products_{timestamp}.csv"
# Upload to GCS
upload_to_gcs(merged_df, filename)
# Write to BigQuery
write_to_bigquery(merged_df, ENRICHED_TABLE)
# Print summary
print("\nProcessing Summary:")
print(f"Total Products: {len(merged_df)}")
print(f"Completed: {len(merged_df[merged_df['processing_status'] == 'completed'])}")
print(f"Failed: {len(merged_df[merged_df['processing_status'] == 'failed'])}")
print(f"Pending: {len(merged_df[merged_df['processing_status'] == 'pending'])}")