tooling/enrichment/consolidate_results.py (164 lines of code) (raw):
#
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud import bigquery, firestore, storage
import pandas as pd
import os
from dotenv import load_dotenv
import time
# Load environment variables
load_dotenv()
# Get environment variables
PROJECT_ID = os.getenv('project_id')
DATASET = os.getenv('bq_dataset')
TABLE = os.getenv('bq_table')
ENRICHED_TABLE = os.getenv('bq_enriched_table')
FIRESTORE_COLLECTION = os.getenv('firestore_collection')
def fetch_all_products_from_bigquery():
"""Fetch all products from BigQuery."""
client = bigquery.Client(project=PROJECT_ID)
query = f"""
SELECT *
FROM `{PROJECT_ID}.{DATASET}.{TABLE}`
ORDER BY id
"""
print("Fetching all products from BigQuery...")
df = client.query(query).to_dataframe()
print(f"Fetched {len(df)} products from BigQuery")
return df
def fetch_processed_products_from_firestore():
"""Fetch all successfully processed products from Firestore."""
db = firestore.Client(project=PROJECT_ID)
collection = db.collection(FIRESTORE_COLLECTION)
print("Fetching processed products from Firestore...")
# Query only completed products
docs = collection.where(filter=firestore.FieldFilter('status', '==', 'completed')).stream()
processed_products = []
for doc in docs:
try:
product_id = int(doc.id) # Convert string ID to int
data = doc.to_dict()
print(f"\nDebug - Firestore document for product {product_id}:")
print(f"Data: {data}")
# Extract data from the nested product_data if it exists
product_data = data.get('product_data', {})
# Create base product info
product_info = {
'id': product_id,
'image_uri': data.get('image_uri'),
'description': data.get('description'),
'completed_at': data.get('completed_at'),
'status': data.get('status'),
'started_at': data.get('started_at'),
'updated_at': data.get('updated_at')
}
# Add all product_data fields except 'id' which we already have
if product_data:
product_data.pop('id', None) # Remove id from product_data if it exists
product_info.update(product_data)
processed_products.append(product_info)
except Exception as e:
print(f"Error processing document {doc.id}: {str(e)}")
continue
print(f"\nFetched {len(processed_products)} processed products from Firestore")
if processed_products:
print("\nSample of first processed product:")
print(processed_products[0])
return pd.DataFrame(processed_products) if processed_products else pd.DataFrame(columns=['id', 'image_uri', 'description', 'completed_at', 'status', 'started_at', 'updated_at'])
def fetch_failed_products_from_firestore():
"""Fetch all permanently failed products from Firestore."""
db = firestore.Client(project=PROJECT_ID)
collection = db.collection(FIRESTORE_COLLECTION)
print("\nFetching failed products from Firestore...")
# Query permanently failed products
docs = collection.where(filter=firestore.FieldFilter('status', '==', 'permanently_failed')).stream()
failed_products = []
for doc in docs:
try:
product_id = int(doc.id)
data = doc.to_dict()
product_data = data.get('product_data', {})
# Create base product info
product_info = {
'id': product_id,
'error_message': data.get('error_message'),
'retry_count': data.get('retry_count'),
'failed_at': data.get('failed_at'),
'status': data.get('status'),
'started_at': data.get('started_at'),
'updated_at': data.get('updated_at')
}
# Add all product_data fields except 'id'
if product_data:
product_data.pop('id', None) # Remove id from product_data if it exists
product_info.update(product_data)
failed_products.append(product_info)
except Exception as e:
print(f"Error processing document {doc.id}: {str(e)}")
continue
print(f"Fetched {len(failed_products)} failed products from Firestore")
return pd.DataFrame(failed_products) if failed_products else pd.DataFrame(columns=['id', 'error_message', 'retry_count', 'failed_at', 'status', 'started_at', 'updated_at'])
def upload_to_gcs(df, filename):
"""Upload DataFrame as CSV to Google Cloud Storage."""
storage_client = storage.Client()
bucket = storage_client.bucket(os.getenv('psearch_img_bucket'))
# Save DataFrame to a temporary CSV file
temp_filename = f"temp_{filename}"
df.to_csv(temp_filename, index=False)
# Upload to GCS
blob = bucket.blob(f"exports/{filename}")
blob.upload_from_filename(temp_filename)
# Clean up temporary file
os.remove(temp_filename)
print(f"Uploaded consolidated results to gs://{os.getenv('psearch_img_bucket')}/exports/{filename}")
def write_to_bigquery(df, table_name):
"""Write DataFrame to BigQuery table."""
client = bigquery.Client(project=PROJECT_ID)
table_id = f"{PROJECT_ID}.{DATASET}.{table_name}"
print(f"Writing {len(df)} rows to BigQuery table: {table_id}")
# Configure the job
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_TRUNCATE", # Overwrite the table if it exists
)
job = client.load_table_from_dataframe(
df, table_id, job_config=job_config
)
# Wait for the job to complete
job.result()
print(f"✓ Successfully wrote {len(df)} rows to {table_id}")
def consolidate_results():
"""Consolidate results from BigQuery and Firestore."""
# Fetch all data
bq_df = fetch_all_products_from_bigquery()
processed_df = fetch_processed_products_from_firestore()
failed_df = fetch_failed_products_from_firestore()
print("\nDebug - Processed DataFrame Info:")
print(processed_df.info())
if not processed_df.empty:
print("\nSample of processed data:")
print(processed_df.head())
# Initialize merged DataFrame with BigQuery data
merged_df = bq_df.copy()
# Add columns for processing data with default values
merged_df['image_uri'] = None
merged_df['description'] = None
merged_df['completed_at'] = None
merged_df['error_message'] = None
merged_df['retry_count'] = 0
merged_df['failed_at'] = None
merged_df['started_at'] = None
merged_df['updated_at'] = None
merged_df['processing_status'] = 'pending'
# Update processed products if any exist
if not processed_df.empty:
print("\nUpdating processed products...")
processed_columns = ['image_uri', 'description', 'completed_at', 'started_at', 'updated_at']
for _, row in processed_df.iterrows():
product_id = row['id']
mask = merged_df['id'] == product_id
print(f"\nUpdating product {product_id}:")
print(f"Original data: {merged_df.loc[mask, processed_columns].to_dict('records')}")
print(f"New data: {row[processed_columns].to_dict()}")
# Update all relevant columns
for col in processed_columns:
if col in row and pd.notna(row[col]):
merged_df.loc[mask, col] = row[col]
merged_df.loc[mask, 'processing_status'] = 'completed'
# Update failed products if any exist
if not failed_df.empty:
print("\nUpdating failed products...")
failed_columns = ['error_message', 'retry_count', 'failed_at', 'started_at', 'updated_at']
for _, row in failed_df.iterrows():
product_id = row['id']
mask = merged_df['id'] == product_id
print(f"\nUpdating failed product {product_id}:")
print(f"Original data: {merged_df.loc[mask, failed_columns].to_dict('records')}")
print(f"New data: {row[failed_columns].to_dict()}")
# Update all relevant columns
for col in failed_columns:
if col in row and pd.notna(row[col]):
merged_df.loc[mask, col] = row[col]
merged_df.loc[mask, 'processing_status'] = 'failed'
# Print sample of final data
print("\nSample of final merged data:")
sample_completed = merged_df[merged_df['processing_status'] == 'completed'].head()
if not sample_completed.empty:
print("\nCompleted products sample:")
display_columns = ['id', 'name', 'brand', 'image_uri', 'description', 'completed_at', 'processing_status']
print(sample_completed[display_columns])
# Generate timestamp for filename
timestamp = int(time.time())
filename = f"consolidated_products_{timestamp}.csv"
# Upload to GCS
upload_to_gcs(merged_df, filename)
# Write to BigQuery
write_to_bigquery(merged_df, ENRICHED_TABLE)
# Print summary
print("\nProcessing Summary:")
print(f"Total Products: {len(merged_df)}")
print(f"Completed: {len(merged_df[merged_df['processing_status'] == 'completed'])}")
print(f"Failed: {len(merged_df[merged_df['processing_status'] == 'failed'])}")
print(f"Pending: {len(merged_df[merged_df['processing_status'] == 'pending'])}")
if __name__ == "__main__":
consolidate_results()