in src/psearch/ingestion/main.py [0:0]
def run(self):
"""Run the ingestion process"""
try:
# Extract products from BigQuery
query = f"""
SELECT *
FROM `{self.project_id}.products_retail_search_rich.products_enriched_3`
WHERE availability = 'IN_STOCK'
"""
products = self.bq_service.extract_product_data(query)
logging.info(f"Extracted {len(products)} products from BigQuery")
# Process in batches of 100 (maximum allowed for embedding API)
batch_size = 50
for i in range(0, len(products), batch_size):
batch = products[i : i + batch_size]
# Prepare product ids and texts for batch embedding
product_ids = []
product_texts = []
for product in batch:
product_id = product["id"]
product_ids.append(product_id)
# Get text for embedding
product_text = self.get_product_text(product)
product_texts.append(product_text)
# Generate embeddings in batch
embeddings = self.generate_embeddings_batch(product_texts)
# Prepare data for Spanner storage (including embeddings) - Step 2 of plan
products_to_store = []
for idx, product in enumerate(batch):
product_id = product.get("id")
if not product_id:
logging.warning(
f"Product missing ID in batch, skipping: {product.get('title')}"
)
continue
products_to_store.append(
{
"id": product_id,
"data": product, # Pass the full original product data
"title": product.get(
"title", ""
), # Extract title explicitly
"embedding": embeddings[
idx
].tolist(), # Add the embedding list
}
)
# Store product data and embeddings in Spanner batch - Step 3 of plan
if products_to_store: # Check if there are valid products to store
self.spanner_service.store_products_batch(products_to_store)
else:
logging.warning(
f"No valid products with IDs found in batch {i//batch_size + 1}, skipping Spanner storage for this batch."
)
logging.info(
f"Processed batch of {len(batch)} products (batch {i//batch_size + 1}/{(len(products)-1)//batch_size + 1})"
)
# Add small delay between batches
if i + batch_size < len(products):
sleep(1)
logging.info("Ingestion completed successfully using batch mode")
except Exception as e:
logging.error(f"Ingestion failed: {str(e)}")
raise