in src/psearch/ingestion/services/bigquery_service.py [0:0]
def extract_product_data(self, query: str) -> List[Dict[str, Any]]:
"""
Extract product data from BigQuery
Args:
query: SQL query to extract product data
Returns:
List of product dictionaries
"""
try:
query_job = self.client.query(query)
results = query_job.result()
products = []
for row in results:
product = dict(row.items())
products.append(product)
logging.info(f"Extracted {len(products)} products from BigQuery")
return products
except Exception as e:
logging.error(f"Error extracting products from BigQuery: {str(e)}")
raise