def run()

in src/psearch/ingestion/main.py [0:0]


    def run(self):
        """Run the ingestion process"""
        try:
            # Extract products from BigQuery
            query = f"""
            SELECT *
            FROM `{self.project_id}.products_retail_search_rich.products_enriched_3`
            WHERE availability = 'IN_STOCK'
            """
            products = self.bq_service.extract_product_data(query)
            logging.info(f"Extracted {len(products)} products from BigQuery")

            # Process in batches of 100 (maximum allowed for embedding API)
            batch_size = 50
            for i in range(0, len(products), batch_size):
                batch = products[i : i + batch_size]

                # Prepare product ids and texts for batch embedding
                product_ids = []
                product_texts = []

                for product in batch:
                    product_id = product["id"]
                    product_ids.append(product_id)

                    # Get text for embedding
                    product_text = self.get_product_text(product)
                    product_texts.append(product_text)

                # Generate embeddings in batch
                embeddings = self.generate_embeddings_batch(product_texts)

                # Prepare data for Spanner storage (including embeddings) - Step 2 of plan
                products_to_store = []
                for idx, product in enumerate(batch):
                    product_id = product.get("id")
                    if not product_id:
                        logging.warning(
                            f"Product missing ID in batch, skipping: {product.get('title')}"
                        )
                        continue

                    products_to_store.append(
                        {
                            "id": product_id,
                            "data": product,  # Pass the full original product data
                            "title": product.get(
                                "title", ""
                            ),  # Extract title explicitly
                            "embedding": embeddings[
                                idx
                            ].tolist(),  # Add the embedding list
                        }
                    )

                # Store product data and embeddings in Spanner batch - Step 3 of plan
                if products_to_store:  # Check if there are valid products to store
                    self.spanner_service.store_products_batch(products_to_store)
                else:
                    logging.warning(
                        f"No valid products with IDs found in batch {i//batch_size + 1}, skipping Spanner storage for this batch."
                    )

                logging.info(
                    f"Processed batch of {len(batch)} products (batch {i//batch_size + 1}/{(len(products)-1)//batch_size + 1})"
                )

                # Add small delay between batches
                if i + batch_size < len(products):
                    sleep(1)

            logging.info("Ingestion completed successfully using batch mode")

        except Exception as e:
            logging.error(f"Ingestion failed: {str(e)}")
            raise