in tooling/enrichment/main.py [0:0]
def process_products():
processed_rows = []
total_processed = 0
last_id = firestore_client.get_last_processed_id()
# First, try to process any previously failed items
print("Checking for failed products to retry...")
failed_products = firestore_client.get_failed_products()
for failed_product in failed_products:
product_data = failed_product['product_data']
row = pd.Series(product_data)
result, error = process_single_product(row)
if result:
processed_rows.append(result)
total_processed += 1
time.sleep(1) # Rate limiting
# Now process new items
while total_processed < 30000:
df = fetch_bigquery_data(last_id)
if df is None or df.empty:
print("No more rows to process")
break
for index, row in df.iterrows():
product_id = row['id']
# Skip if product already processed
if firestore_client.is_product_processed(product_id):
print(f"Product {product_id} already processed, skipping...")
last_id = product_id
continue
result, error = process_single_product(row)
if result:
processed_rows.append(result)
total_processed += 1
# Update last processed ID
last_id = product_id
firestore_client.update_last_processed_id(last_id)
# Add some delay to avoid rate limiting
time.sleep(1)
print(f"Processed {total_processed} products so far")
# Create final DataFrame and export to CSV
if processed_rows:
final_df = pd.DataFrame(processed_rows)
output_filename = f"processed_products_{int(time.time())}.csv"
final_df.to_csv(output_filename, index=False)
print(f"Exported results to {output_filename}")
# Upload CSV to GCS
storage_client = storage.Client()
bucket = storage_client.bucket(os.getenv('psearch_img_bucket'))
blob = bucket.blob(f"exports/{output_filename}")
blob.upload_from_filename(output_filename)
print(f"Uploaded CSV to gs://{os.getenv('psearch_img_bucket')}/exports/{output_filename}")