in src/psearch/ingestion/services/spanner_service.py [0:0]
def store_products_batch(self, products: List[Dict[str, Any]]) -> bool:
"""
Store multiple products in Spanner in a batch
Args:
products: List of product dictionaries with "id" field
Returns:
bool: True if successful, False otherwise
"""
if not products:
return True
try:
# Prepare all valid products for batch insertion
batch_values = []
for product_info in products:
product_id = product_info.get("id")
if not product_id:
logging.warning("Product info missing ID, skipping")
continue
product_data_dict = product_info.get(
"data", {}
) # Get the original product data dict
product_json = self._prepare_product_for_spanner(product_data_dict)
title = product_info.get("title", "")
embedding = product_info.get("embedding")
if embedding is None:
logging.warning(f"Product {product_id} missing embedding, skipping")
continue
batch_values.append(
(
product_id,
product_json,
title,
embedding,
)
)
# If no valid products, return early
if not batch_values:
logging.warning("No valid products to insert")
return True
# Perform a single batch operation
with self.database.batch() as batch:
batch.insert_or_update(
table="products",
columns=[
"product_id",
"product_data",
"title",
"embedding", # Updated columns list
],
values=batch_values,
)
logging.info(
f"Successfully stored {len(batch_values)} products in Spanner batch"
)
return True
except Exception as e:
logging.error(f"Error batch storing products in Spanner: {str(e)}")
raise e