tooling/enrichment/firestore_client.py (94 lines of code) (raw):

# # Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from google.cloud import firestore from datetime import datetime from google.api_core.exceptions import FailedPrecondition class FirestoreClient: def __init__(self, project_id, collection_name): self.db = firestore.Client(project=project_id) self.collection = self.db.collection(collection_name) def get_last_processed_id(self): """Get the last processed ID from Firestore.""" doc_ref = self.collection.document('processing_status') doc = doc_ref.get() if doc.exists: return doc.to_dict().get('last_processed_id', 0) return 0 def update_last_processed_id(self, last_id): """Update the last processed ID in Firestore.""" doc_ref = self.collection.document('processing_status') doc_ref.set({ 'last_processed_id': last_id, 'last_updated': firestore.SERVER_TIMESTAMP }) def is_product_processed(self, product_id): """Check if a product has already been processed successfully.""" doc_ref = self.collection.document(str(product_id)) doc = doc_ref.get() if doc.exists: status = doc.to_dict().get('status') retry_count = doc.to_dict().get('retry_count', 0) # Return True if completed or failed too many times return status == 'completed' or retry_count >= 3 return False def get_failed_products(self): """Get list of products that failed and haven't exceeded retry limit.""" try: # First try with a simple status filter failed_docs = ( self.collection .where(filter=firestore.FieldFilter('status', '==', 'failed')) .stream() ) # Filter retry count in memory failed_products = [] for doc in failed_docs: data = doc.to_dict() retry_count = data.get('retry_count', 0) if retry_count < 3: failed_products.append({ 'id': int(doc.id), **data }) return failed_products except FailedPrecondition as e: print("\nFirestore index error. Please create the following indexes:") print("1. Collection: product_index") print("2. Fields to index:") print(" - status (Ascending)") print(" - retry_count (Ascending)") print(" - __name__ (Ascending)") print("\nYou can create the index using the Firebase Console or using the following command:") print("gcloud firestore indexes composite create --collection-group=product_index --field-config field=status,order=ASCENDING field=retry_count,order=ASCENDING field=__name__,order=ASCENDING") print("\nOr visit the following URL to create the index:") print("https://console.firebase.google.com/project/psearch-dev/firestore/indexes") return [] def start_product_processing(self, product_id, product_data): """Mark a product as being processed.""" doc_ref = self.collection.document(str(product_id)) doc = doc_ref.get() data = { 'status': 'processing', 'product_data': product_data, 'started_at': firestore.SERVER_TIMESTAMP, 'updated_at': firestore.SERVER_TIMESTAMP, 'retry_count': 0 } # If document exists, increment retry count if doc.exists: current_retry = doc.to_dict().get('retry_count', 0) data['retry_count'] = current_retry + 1 doc_ref.set(data) def complete_product_processing(self, product_id, image_uri, description): """Mark a product as completed with generated data.""" doc_ref = self.collection.document(str(product_id)) doc_ref.update({ 'status': 'completed', 'image_uri': image_uri, 'description': description, 'completed_at': firestore.SERVER_TIMESTAMP, 'updated_at': firestore.SERVER_TIMESTAMP }) def mark_product_failed(self, product_id, error_message): """Mark a product as failed with error details.""" doc_ref = self.collection.document(str(product_id)) doc = doc_ref.get() update_data = { 'status': 'failed', 'error_message': error_message, 'failed_at': firestore.SERVER_TIMESTAMP, 'updated_at': firestore.SERVER_TIMESTAMP } if doc.exists: retry_count = doc.to_dict().get('retry_count', 0) update_data['retry_count'] = retry_count # If we've tried 3 times, mark as permanently failed if retry_count >= 3: update_data['status'] = 'permanently_failed' doc_ref.update(update_data)