### <font color='#4285f4'>Overview</font>

This notebook empowers marketers to create hyper-personalized emails with unique marketing text and images generated for each customer. By analyzing current events in Paris and leveraging Gemini's advanced capabilities, we identify relevant events and tailor messages to customers' interests, even translating content and providing transparent explanations for all AI-generated elements.

Process Flow:
1. Download current events for the next week
2. Extract up to 7 keywords from the events using Gemini
3. Create vector embeddings of the keywords
4. Ask Gemini for the top 3 public events in Paris
5. Match the top 3 events (vector search) with customers who have similar interests
6. Generate a marketing message based upon the customer's profile. Determine their top 3 menu items and craft a message that hints at the event taking place.
7. Pass the marketing message and ask Gemini to create a prompt for Imagen3
    * a. Gemini with author our Imagen3 prompt for us.
8. Generate the image based upon the image prompt Gemini created
9. Pass the marketing message, user profile and generated image to Gemini Vision.
    * a. Ask Gemini to verify that the image satisfies the original request.
10. Ask Gemini to translate the marketing text into another language.
    * a. Ask Gemini to verify the translation to make sure it did what we asked
11. Create the HTML email message.

Notes:
* We could also select menu items that also match the customer's food allergies
* Since we are download event data, we could partner with the events

Cost:
* Very Small: A few calls to Gemini, Imagen3 and BigQuery
* Medium: Remember to stop your Colab Enterprise Notebook Runtime

Author: 
* Adam Paternostro

In [None]:
# Architecture Diagram
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Artifacts/Campaign-Assets-Hyper-Personalized-Email-Architecture.png', width=1200)

### <font color='#4285f4'>Video Walkthrough</font>

[![Video](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/adam-paternostro-video.png)](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Assets-Hyper-Personalized-Email.mp4)


In [None]:
from IPython.display import HTML

HTML("""
<video width="800" height="600" controls>
  <source src="https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Assets-Hyper-Personalized-Email.mp4" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")

### <font color='#4285f4'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

### <font color='#4285f4'>Pip installs</font>

In [None]:
# PIP Installs
import sys

# https://serpapi.com/
!{sys.executable} -m pip install google-search-results

### <font color='#4285f4'>Initialize</font>

In [None]:
# You will need to create an account and verify your email.
# https://serpapi.com/
# You get 100 free calls (per month)

serpapi_key = "<<replace with your key>>"

# date:today - Today's Events
# date:tomorrow - Tomorrow's Events
htichips = "date:today"
htichips = "date:week"

In [None]:
from PIL import Image
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import random
import time
import datetime
import base64
import random

import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

In [None]:
# Set these (run this cell to verify the output)

bigquery_location = "${bigquery_location}"
region = "${region}"
location = "${location}"
storage_account = "${chocolate_ai_bucket}"

# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

gemini_languages = ["Arabic (ar)",  "Bengali (bn)",  "Bulgarian (bg)",  "Chinese simplified (zh)",  "Chinese traditional (zh)",
  "Croatian (hr)",  "Czech (cs)",  "Danish (da)",  "Dutch (nl)",  "Estonian (et)",  "Finnish (fi)",  "French (fr)",
  "German (de)",  "Greek (el)",  "Hebrew (iw)",  "Hindi (hi)",  "Hungarian (hu)",  "Indonesian (id)",  "Italian (it)",  "Japanese (ja)",
  "Korean (ko)",  "Latvian (lv)",  "Lithuanian (lt)",  "Norwegian (no)",  "Polish (pl)",  "Portuguese (pt)",  "Romanian (ro)",  "Russian (ru)",
  "Serbian (sr)",  "Slovak (sk)",  "Slovenian (sl)",  "Spanish (es)",  "Swahili (sw)",  "Swedish (sv)",  "Thai (th)",  "Turkish (tr)",  "Ukrainian (uk)",
  "Vietnamese (vi)"]

# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

### <font color='#4285f4'>Helper Methods</font>

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### RetryCondition (for retrying LLM calls)

In [None]:
def RetryCondition(error):
  error_string = str(error)
  print(error_string)

  retry_errors = [
      "RESOURCE_EXHAUSTED",
      "No content in candidate",
      # Add more error messages here as needed
  ]

  for retry_error in retry_errors:
    if retry_error in error_string:
      print("Retrying...")
      return True

  return False

#### Imagen3 Image Generation

In [None]:
def ImageGen(prompt):
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  model_version = "imagen-3.0-generate-001" # imagen-3.0-fast-generate-001
  #model_version = "imagen-3.0-generate-preview-0611" # Preview Access Model

  # https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/image-generation
  # url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project}/locations/{location}/publishers/google/models/imagegeneration:predict"
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project}/locations/{location}/publishers/google/models/{model_version}:predict"

  payload = {
    "instances": [
      {
        "prompt": prompt
      }
    ],
    "parameters": {
      "sampleCount": 1,
      "personGeneration" : "dont_allow"  # change to allow_adult for people generation
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    response_json = json.loads(response.content)
    print(f"Imagen3 response_json: {response_json}")

    if "blocked" in response_json:
      print(f"Blocked: {response_json['blocked']}")

    if "predictions" in response_json:
      image_data = response_json["predictions"][0]["bytesBase64Encoded"]
      image_data = base64.b64decode(image_data)
      filename= str(uuid.uuid4()) + ".png"
      with open(filename, "wb") as f:
        f.write(image_data)
      print(f"Image generated OK.")
      return filename
    else:
      raise RuntimeError(f"No predictions in response: {response.content}")
  else:
    error = f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'"
    raise RuntimeError(error)

#### Gemini LLM (Pro 1.0 , Pro 1.5)

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM(prompt, model = "gemini-2.0-flash", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add inthe response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-2.0-flash":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": {
          "text": prompt
      },
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM_VerifyImage(prompt, imageBase64, model = "gemini-2.0-flash", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models
  # model = "gemini-2.0-flash"

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add inthe response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-2.0-flash" or model == "gemini-1.0-pro":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": [
          { "text": prompt },
          { "inlineData": {  "mimeType": "image/png", "data": f"{imageBase64}" } }
        ]
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

#### Helper Functions

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

In [None]:
def convert_png_to_base64(image_path):
  image = cv2.imread(image_path)

  # Convert the image to a base64 string.
  _, buffer = cv2.imencode('.png', image)
  base64_string = base64.b64encode(buffer).decode('utf-8')

  return base64_string

In [None]:
def PrettyPrintJson(json_string):
  json_object = json.loads(json_string)
  json_formatted_str = json.dumps(json_object, indent=2)
  return json_formatted_str

In [None]:
def GetNextPrimaryKey(fully_qualified_table_name, field_name):
  from google.cloud import bigquery
  client = bigquery.Client()
  sql = f"""
  SELECT IFNULL(MAX({field_name}),0) AS result
    FROM `{fully_qualified_table_name}`
  """
  # print(sql)
  df_result = client.query(sql).to_dataframe()
  # display(df_result)
  return df_result['result'].iloc[0] + 1

In [None]:
def haveEventsBeenDownloaded(fully_qualified_table_name, field_name):
  from google.cloud import bigquery
  client = bigquery.Client()
  sql = f"""
  SELECT IFNULL(COUNT({field_name}),0) AS result
    FROM `{fully_qualified_table_name}`
  WHERE download_date = CURRENT_DATE()
  """
  # print(sql)
  df_result = client.query(sql).to_dataframe()
  # display(df_result)
  if df_result['result'].iloc[0] > 0:
    return True
  else:
    return False

In [None]:
# This was generated by GenAI

def copy_file_to_gcs(local_file_path, bucket_name, destination_blob_name):
  """Copies a file from a local drive to a GCS bucket.

  Args:
      local_file_path: The full path to the local file.
      bucket_name: The name of the GCS bucket to upload to.
      destination_blob_name: The desired name of the uploaded file in the bucket.

  Returns:
      None
  """

  import os
  from google.cloud import storage

  # Ensure the file exists locally
  if not os.path.exists(local_file_path):
      raise FileNotFoundError(f"Local file '{local_file_path}' not found.")

  # Create a storage client
  storage_client = storage.Client()

  # Get a reference to the bucket
  bucket = storage_client.bucket(bucket_name)

  # Create a blob object with the desired destination path
  blob = bucket.blob(destination_blob_name)

  # Upload the file from the local filesystem
  content_type = ""
  if local_file_path.endswith(".html"):
    content_type = "text/html; charset=utf-8"

  if local_file_path.endswith(".json"):
    content_type = "application/json; charset=utf-8"

  if content_type == "":
    blob.upload_from_filename(local_file_path)
  else:
    blob.upload_from_filename(local_file_path, content_type = content_type)

  print(f"File '{local_file_path}' uploaded to GCS bucket '{bucket_name}' as '{destination_blob_name}.  Content-Type: {content_type}'.")

### <font color='#4285f4'>Create BigQuery Tables</font>

In [None]:
%%bigquery

#DROP TABLE IF EXISTS `chocolate_ai.customer_hyper_personalized_email`;

CREATE TABLE IF NOT EXISTS `chocolate_ai.customer_hyper_personalized_email`
(
    customer_id                                    INTEGER NOT NULL OPTIONS(description="Primary key."),
    email_date                                     DATE    NOT NULL OPTIONS(description="The date of the email."),

    llm_marketing_prompt                           STRING  OPTIONS(description="The prompt to generate the marketing email text."),
    llm_marketing_prompt_response_json             JSON    OPTIONS(description="The response from the llm marketing prompt in json."),
    llm_marketing_prompt_response_text             STRING  OPTIONS(description="The response from the llm marketing prompt in text."),

    llm_orginial_image_prompt                      STRING  OPTIONS(description="The prompt to generate the original image text."),
    llm_orginial_image_prompt_response_json        JSON    OPTIONS(description="The response from the llm original prompt in json."),
    llm_orginial_image_prompt_response_text        STRING  OPTIONS(description="The response from the llm original prompt in text."),

    llm_improved_image_prompt                      STRING  OPTIONS(description="The prompt to generate the improved image text."),
    -- The improved prompt will be passed to Imagen3
    --llm_improved_image_prompt_response_json      JSON    OPTIONS(description="The response from the llm improved prompt in json."),
    --llm_improved_image_prompt_response_text      STRING  OPTIONS(description="The response from the llm improved prompt in text."),

    llm_verify_image_prompt                        STRING  OPTIONS(description="The prompt to verify the generated image."),
    llm_verify_image_response_json                 JSON    OPTIONS(description="The response from verify the generated image in json."),
    llm_verify_image_text                          STRING  OPTIONS(description="The response from verify the generated image in text."),

    llm_translation_language_prompt                STRING  OPTIONS(description="The prompt to generate the secondary lanagage text."),
    llm_translation_language_prompt_response_json  JSON    OPTIONS(description="The response from the llm secondary lanagage prompt in json."),
    llm_translation_language_prompt_response_text  STRING  OPTIONS(description="The response from the llm secondary lanagage prompt in text."),

    llm_validate_translation_prompt                STRING  OPTIONS(description="The prompt to generate the vadiation of the translation text."),
    llm_validate_translation_prompt_response_json  JSON    OPTIONS(description="The response from the llm vadiation of the translation prompt in json."),
    llm_validate_translation_prompt_response_text  STRING  OPTIONS(description="The response from the llm vadiation of the translation prompt in text."),

    image_gcs_filename                             STRING  OPTIONS(description="The GCS path for the marketing campaign image."),
    image_http_url                                 STRING  OPTIONS(description="The HTTP path for the marketing campaign image."),
    image_generated                                BOOLEAN OPTIONS(description="Has the image been generated and saved to GCS."),
    image_verified                                 BOOLEAN OPTIONS(description="Did the image pass verification."),

    html_gcs_filename                              STRING  OPTIONS(description="The GCS path for the marketing campaign HTML file."),
    html_http_url                                  STRING  OPTIONS(description="The HTTP path for the marketing campaign HTML file."),
    html_generated                                 BOOLEAN OPTIONS(description="Has the HTML been generated and saved to GCS."),
    translation_verified                           BOOLEAN OPTIONS(description="Did the translation pass verification."),
)
CLUSTER BY customer_id;

In [None]:
%%bigquery

--DROP TABLE IF EXISTS `chocolate_ai.event`;

CREATE TABLE IF NOT EXISTS `chocolate_ai.event`
(
    event_id          INT     NOT NULL OPTIONS(description="The primary key of the event."),
    download_date     DATE    NOT NULL OPTIONS(description="The of the downloaded data (we download many event dates at once)"),
    event_title       STRING  NOT NULL OPTIONS(description="The title of the event."),
    event_date        DATE    NOT NULL OPTIONS(description="The date of the event."),
    event_time_string STRING  NOT NULL OPTIONS(description="The time (string value)  of the event."),
    event_venue       STRING  NOT NULL OPTIONS(description="The venue of the event."),
    event_venue_link  STRING  NOT NULL OPTIONS(description="The generated insight in text"),
    event_address     STRING  NOT NULL OPTIONS(description="The full address of the event."),
    event_description STRING  NOT NULL OPTIONS(description="The description of the event."),
    event_reviews     INT64   NOT NULL OPTIONS(description="The number of reviews of the event."),
    event_thumbnail   STRING  NOT NULL OPTIONS(description="The thumbnail image for the event."),

    llm_event_prompt             STRING             OPTIONS(description="The LLM prompt to extract the event keywords."),
    llm_event_prompt_json_result JSON               OPTIONS(description="The unparsed json results from the LLM"),

    event_keywords               STRING             OPTIONS(description="The keywords for the event."),
    event_keywords_embeddings    ARRAY<FLOAT64>     OPTIONS(description="Vector embedding of the event keywords column."),
)
CLUSTER BY event_id;

### <font color='#4285f4'>Get the Events (You only need to run this once since they are saved)</font>

You can use a 3rd party service to download the events
- https://serpapi.com/google-events-api
- https://serpapi.com/blog/scrape-google-events-results-with-python/

#### Use serpapi

In [None]:
# %%bigquery
# OPTIONAL: Remove the current days events (in case you re-run this notebook). This will force a call to serpapi
# DELETE FROM `chocolate_ai.event` AS event WHERE event_date = CURRENT_DATE();

In [None]:
from serpapi import GoogleSearch
from datetime import date

event_records = []

if haveEventsBeenDownloaded("${project_id}.${bigquery_chocolate_ai_dataset}.event","event_id") == False:
  event_id = GetNextPrimaryKey("${project_id}.${bigquery_chocolate_ai_dataset}.event","event_id")
  print(f"event_id: {event_id}")

  event_date = date.today()

  google_events_params = {
    "engine": "google_events",
    "q": f"Events in Paris, France",
    "hl": "en",
    "gl": "fl",
    "api_key": f"{serpapi_key}",
    'start': 0,
    "htichips" : f"{htichips}"
  }

  while True:
      search = GoogleSearch(google_events_params)
      event_search_results = search.get_dict()
      if 'error' in event_search_results:
          break

      for item in event_search_results["events_results"]:
          try:
              event_venue = ""
              address = ""
              for address_item in item["address"]:
                if event_venue == "":
                  split_text = address_item.split(", ")
                  event_venue = split_text[0]
                  address = split_text[1]
                else:
                  address = address + " " + address_item

              print(f"item: {item}")
              print(f"event_venue: {event_venue}")
              print(f"address: {address}")
              print("------------------------------------------------------")

              event = {
                  "event_id" : event_id,
                  "event_title" : item['title'],
                  "event_date" : event_date, # Placeholder
                  "event_time_string" : item['date']['when'],
                  "event_venue" : event_venue,
                  "event_venue_link" : item['venue']['link'],
                  "event_address" : address,
                  "event_description" : item['description'],
                  "event_reviews" : item['venue']['reviews'],
                  "event_thumbnail" : item['thumbnail']
              }
              event_id = event_id + 1

              event_records.append(event)

          except Exception as error:
              print(f"error: {error}")
              print(f"item: {item}")
              print(f"------------------------------")

      google_events_params['start'] += 10

      if google_events_params['start'] > 30:
          break # only do 30 for now so our prompt is not too large

#### Convert the Event Date to a proper date

In [None]:
# if we downloaded events then save them (otherwise skip)
from datetime import date

if len(event_records) > 0 :
  from datetime import datetime
  # Write me the json in  OpenAPI 3.0 schema object for the below object.
  # Make all fields required.
  #  {
  #    "date" : "YYYY-MM-DD"
  #  }
  response_schema = {
    "type": "object",
    "required": ["date"],
    "properties": {
      "date": {
        "type": "string"
      }
    }
  }

  for item in event_records:
    if "download_date" not in item:
      item["download_date"] = date.today()

    # fix the event data (we need gemini to tell us a good event data)
    llm_date_prompt = f"""Parse the following date into the following format: YYYY-MM-DD.
    If the date spans several days then use the first day of the date.
    The date to format is: {item['event_time_string']}"""

    print (llm_date_prompt)

    llm_date_response = GeminiLLM(llm_date_prompt, response_schema=response_schema)
    llm_date_dict = json.loads(llm_date_response)

    item["event_date"] = datetime.strptime(llm_date_dict['date'], "%Y-%m-%d").date()
    print(event_date)

#### Save the Events (bulk insert into BigQuery)

In [None]:
# if we downloaded events then save them (otherwise skip)

if len(event_records) > 0 :
  import pandas as pd
  from google.cloud import bigquery

  # Load the events table (in bulk)
  table_id = "${project_id}.${bigquery_chocolate_ai_dataset}.event"

  dataframe = pd.DataFrame(
      pd.DataFrame(event_records), # Your source data
      columns=[
          "event_id",
          "download_date",
          "event_title",
          "event_date",
          "event_time_string",
          "event_venue",
          "event_venue_link",
          "event_address",
          "event_description",
          "event_reviews",
          "event_thumbnail"
      ],
  )

  job_config = bigquery.LoadJobConfig(
      schema=[
          bigquery.SchemaField("event_id", bigquery.enums.SqlTypeNames.INT64, mode="REQUIRED"),
          bigquery.SchemaField("download_date", bigquery.enums.SqlTypeNames.DATE, mode="REQUIRED"),
          bigquery.SchemaField("event_title", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"),
          bigquery.SchemaField("event_date", bigquery.enums.SqlTypeNames.DATE, mode="REQUIRED"),
          bigquery.SchemaField("event_time_string", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"),
          bigquery.SchemaField("event_venue", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"),
          bigquery.SchemaField("event_venue_link", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"),
          bigquery.SchemaField("event_address", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"),
          bigquery.SchemaField("event_description", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"),
          bigquery.SchemaField("event_reviews", bigquery.enums.SqlTypeNames.INT64, mode="REQUIRED"),
          bigquery.SchemaField("event_thumbnail", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED")
      ],
      write_disposition="WRITE_APPEND",
  )

  load_client = bigquery.Client()
  job = load_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
  job.result()  # Wait for the job to complete.

  table = load_client.get_table(table_id)  # Make an API request.
  print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), table_id))

In [None]:
%%bigquery

# Display the results
SELECT event_id, download_date, event_title, event_date, event_time_string, event_venue	event_venue_link, event_address, event_description
  FROM `chocolate_ai.event`
WHERE download_date = CURRENT_DATE()
ORDER BY event_id

### <font color='#4285f4'>Use Gemini to extract the Event Keywords and create Vector Embeddings</font>

In [None]:
%%bigquery

-- Create our prompt for Gemini and store it in a column so we can then batch score
UPDATE `chocolate_ai.event`
   SET llm_event_prompt = CONCAT(
      'Extract up to 7 keywords from this event that would pertain to peoples hobbies or interests.\n',
      'The keywords should be in English.\n',
      'You will be perform a semantic search with these words.\n',
      'Do not include "Paris" or "France" as a keyword since the search will be done only on Paris data.\n',
      'Do not include dates in the results.\n',
      'Do not include postal codes in the results.\n',
      'Return the results in JSON with no special formatting, preceeding text or markdown text.  It must only be JSON.\n'
      'Return the keywords in the following JSON format:\n',
      '- Example 1: { "keywords": ["text","text"] }\n',
      '- Example 2: { "keywords": ["word","keyword"] }\n',
      '<event_data>\n',
      TO_JSON_STRING(TO_JSON(STRUCT(event_title, event_venue, event_description))),
      '\n',
      '</event_data>')
 WHERE download_date = CURRENT_DATE();

In [None]:
%%bigquery

-- Call Gemini Pro on all of our prompts (batch score)
UPDATE `chocolate_ai.event` AS event_parent
   SET llm_event_prompt_json_result = llm_query.ml_generate_text_result
  FROM (SELECT *
          FROM ML.GENERATE_TEXT(MODEL`${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model`,
               (SELECT event_id,
                       llm_event_prompt AS prompt
                  FROM `${project_id}.${bigquery_chocolate_ai_dataset}.event`
                 WHERE download_date = CURRENT_DATE()),
                STRUCT(.5 AS temperature, .8 AS top_p)
                )
        ) AS llm_query
 WHERE event_parent.event_id = llm_query.event_id;

In [None]:
%%bigquery

-- We get an array back from Gemini, so we need to parse it into keywords, convert the json array to an array of strings and then flatten the string array with spaces as the seperator
SELECT ARRAY_TO_STRING(JSON_EXTRACT_STRING_ARRAY(JSON_EXTRACT(
       PARSE_JSON('{ "keywords": ["construction", "architecture", "building", "materials", "design", "technology", "innovation"] }'),
        '$.keywords')), " ") AS keywords_string;

In [None]:
%%bigquery

-- Extract the keywords from the returned JSON
UPDATE `chocolate_ai.event`
   SET event_keywords = ARRAY_TO_STRING(JSON_EXTRACT_STRING_ARRAY(JSON_EXTRACT(
                            `${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model_result_as_json`(llm_event_prompt_json_result), '$.keywords')), " ")
 WHERE download_date = CURRENT_DATE();

In [None]:
%%bigquery

-- Create vector embeddings on all the keywords
UPDATE `chocolate_ai.event` AS event_parent
   SET event_keywords_embeddings = llm_query.text_embedding
  FROM (SELECT *
          FROM ML.GENERATE_TEXT_EMBEDDING(MODEL`${project_id}.${bigquery_chocolate_ai_dataset}.textembedding_model`,
               (SELECT event_id,
                       event_keywords AS content
                  FROM `${project_id}.${bigquery_chocolate_ai_dataset}.event`
                 WHERE download_date = CURRENT_DATE()
                   --AND ARRAY_LENGTH(event_keywords_embeddings) = 0 -- Always generate
               ),
                STRUCT(TRUE AS flatten_json_output,
                       'SEMANTIC_SIMILARITY' as task_type,
                        768 AS output_dimensionality)
               )
        ) AS llm_query
 WHERE event_parent.event_id = llm_query.event_id;

In [None]:
%%bigquery

SELECT event_id,
       event_date,
       event_title,
       event_description,
       event_keywords
  FROM `chocolate_ai.event`
WHERE download_date = CURRENT_DATE()
ORDER BY event_id;

### <font color='#4285f4'>Ask Gemini for the best event this week to target</font>


In [None]:
%%bigquery event_df

SELECT TO_JSON_STRING(TO_JSON(STRUCT(event_id,
       event_date,
       event_title,
       event_description,
       event_keywords))) AS json_string
  FROM `chocolate_ai.event`
WHERE download_date = CURRENT_DATE()
ORDER BY event_id;

In [None]:
event_list_as_string = ""

for index, row in event_df.iterrows():
  event_list_as_string += row["json_string"] + "\n"

print(event_list_as_string)

In [None]:
#Write me the json in  OpenAPI 3.0 schema object for the below object.
#Make all fields required.
#  {
#    [
#      "event_id" : 2
#      "event_title" : ""
#      "explanation" : ""
#    ]
#  }
response_schema = {
  "type": "object",
  "required": ["event_array"],
  "properties": {
    "event_array": {
      "type": "array",
      "items": {
        "type": "object",
        "required": ["event_id", "event_title", "event_keywords", "explanation" ],
        "properties": {
          "event_id": {
            "type": "integer"
          },
          "event_title": {
            "type": "string"
          },
          "event_keywords": {
            "type": "string"
          },
          "explanation": {
            "type": "string"
          }
        }
      }
    }
  }
}


best_event_prompt = f"""You work in the marketing department at Chocolate AI a chocolate, dessert and coffee shop located in Paris, France.
You are reviewing this weeks list of events that are happening in Paris.
We want you to select the top 3 best events, most public, event that will draw the most amount of people to it.
Explain your reasoning.

Here are the events:
{event_list_as_string}
"""


best_event_response = GeminiLLM(best_event_prompt, response_schema=response_schema)
best_event_dict = json.loads(best_event_response)

best_event_1 = best_event_dict["event_array"][0]["event_id"]
best_event_2 = best_event_dict["event_array"][1]["event_id"]
best_event_3 = best_event_dict["event_array"][2]["event_id"]

In [None]:
print(PrettyPrintJson(json.dumps(best_event_dict)))

### <font color='#4285f4'>Do a Vector Search to find customer who are likely to attend the event</font>


In [None]:
# You can adjust the LIMIT clause for the number of items to generate

sql = f"""WITH semantic_search AS
(
   -- Find customers that match the top 3 events (the keywords we extracted)
   SELECT query.event_id AS event_id,
         query.event_title AS event_title,
         query.event_description AS event_description,
         query.event_keywords AS event_keywords,
         base.customer_id AS customer_id,
         base.customer_marketing_insights AS customer_marketing_insights,
         distance
   FROM VECTOR_SEARCH( ( -- table to search
                        SELECT customer_id, customer_marketing_insights, customer_marketing_insights_embedding
                           FROM `chocolate_ai.customer_marketing_profile`
                           WHERE ARRAY_LENGTH(customer_marketing_insights_embedding) = 768
                        ),
                        'customer_marketing_insights_embedding', -- the column name that contains our embedding (from query above)
                        ( -- source of embeddings
                        SELECT event_id, event_title, event_description, event_keywords, event_keywords_embeddings
                           FROM `chocolate_ai.event`
                           WHERE ARRAY_LENGTH(event_keywords_embeddings) = 768
                           AND download_date = CURRENT_DATE()
                           AND event_id IN ({best_event_1}, {best_event_2}, {best_event_3})
                        ),
                        'event_keywords_embeddings', -- the column name of our newly embedded data (from query above)
                        top_k => 10
                        )
)
-- Get each customer's top 3 menu items
SELECT semantic_search.*,
      customer.customer_name,
      customer.country_code,
      CONCAT(menu_1.menu_name,':',menu_1.menu_description) AS top_1_favorite_menu_item_name,
      CONCAT(menu_2.menu_name,':',menu_2.menu_description) AS top_2_favorite_menu_item_name,
      CONCAT(menu_3.menu_name,':',menu_3.menu_description) AS top_3_favorite_menu_item_name,
FROM semantic_search
      INNER JOIN `chocolate_ai.customer_marketing_profile` AS customer_marketing_profile
              ON semantic_search.customer_id = customer_marketing_profile.customer_id
      INNER JOIN `chocolate_ai.customer` AS customer
              ON semantic_search.customer_id = customer.customer_id
             AND customer.country_code = 'FR' -- You can do different countries (assuming you know they are travel?)
      INNER JOIN `chocolate_ai.menu` AS menu_1
              ON CAST(JSON_VALUE(JSON_EXTRACT_ARRAY(customer_marketing_profile.customer_loyalty_data.top_3_favorite_menu_items,'$')[0]) AS INT64) = menu_1.menu_id
      INNER JOIN `chocolate_ai.menu` AS menu_2
              ON CAST(JSON_VALUE(JSON_EXTRACT_ARRAY(customer_marketing_profile.customer_loyalty_data.top_3_favorite_menu_items,'$')[1]) AS INT64) = menu_2.menu_id
      INNER JOIN `chocolate_ai.menu` AS menu_3
              ON CAST(JSON_VALUE(JSON_EXTRACT_ARRAY(customer_marketing_profile.customer_loyalty_data.top_3_favorite_menu_items,'$')[2]) AS INT64) = menu_3.menu_id
ORDER BY distance -- RAND() (use random to show different results on different runs, "for testing")
LIMIT 10 -- You can change this
"""

top_3_events = RunQuery(sql)

customer_list = []
for row in top_3_events.itertuples():
  customer_dict = {
    "event_id" : row.event_id,
    "event_title" : row.event_title,
    "event_description" : row.event_description,
    "event_keywords" : row.event_keywords,
    "customer_id" : row.customer_id,
    "customer_name" : row.customer_name,
    "country_code" : row.country_code,
    "customer_marketing_insights" : row.customer_marketing_insights,
    "distance" :row.distance,
    "top_1_favorite_menu_item_name" : row.top_1_favorite_menu_item_name,
    "top_2_favorite_menu_item_name" : row.top_2_favorite_menu_item_name,
    "top_3_favorite_menu_item_name" : row.top_3_favorite_menu_item_name
  }
  print(f"Customer: {customer_dict}")
  customer_list.append(customer_dict)

# Just the Ids so we can query
customer_id_list = ([customer['customer_id'] for customer in customer_list])
customer_id_list_str = (', '.join(map(str, customer_id_list)))

### <font color='#4285f4'>Generate and run the LLM Marketing Prompt</font>

In [None]:
# In case you re-run

sql = f"""DELETE
            FROM `chocolate_ai.customer_hyper_personalized_email`
           WHERE customer_id IN ({customer_id_list_str})
             AND email_date = CURRENT_DATE();"""

RunQuery(sql)

In [None]:
# For each customer, generate the marketing prompt and run against Gemini

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "customer_id" : 0,
#    "email_subject" : "text",
#    "marketing_text" : "text",
#    "explanation" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "customer_id",
    "email_subject",
    "marketing_text",
    "explanation"
  ],
  "properties": {
    "customer_id": {
      "type": "integer"
    },
    "email_subject": {
      "type": "string"
    },
    "marketing_text": {
      "type": "string"
    },
    "explanation": {
      "type": "string"
    }
  }
}

for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  retry = 0
  success = False
  while not success:
    try:
      prompt = f"""You are a marketing expert at Chocolate A.I., a Paris-based chocolate, dessert, and coffee shop. Your task is to craft a highly personalized email for customer {item['customer_name']}.

        Leverage the following customer data:

        Top Favorite Menu Items: {item['top_1_favorite_menu_item_name']}, {item['top_2_favorite_menu_item_name']}, {item['top_3_favorite_menu_item_name']}

        Customer Marketing Insights: {item['customer_marketing_insights']}

        The customer lives in: {item['country_code']}

        Context: A relevant event is happening this week in Paris. We believe this customer would be interested based on their profile.

        Goal: Create a persuasive email that encourages the customer to visit Chocolate A.I. and highlights their favorite menu items, subtly connecting them to the ongoing event.

        Format: Please provide a complete, ready-to-send email, tailored to the specific customer and event details. Avoid using placeholders or templates.

        Do not tell the customer this, but here are the event details:
          event_id: {item['event_id']}
          event_title: {item['event_title']}
          event_description: {item['event_description']}
          event_keywords: {item['event_keywords']}

        Explanation: Briefly explain your thought process and how you've customized the email to resonate with the customer's interests and the event's theme.
      """

      print(prompt)
      llm_result = GeminiLLM(prompt,response_schema=response_schema)
      print(llm_result)
      json_result = json.loads(llm_result)
      result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")
      #json_text = json_result['marketing_text'].replace("'","\'")

      # Save to database
      try:

        sql=f"""INSERT INTO `chocolate_ai.customer_hyper_personalized_email`
                            (customer_id, email_date, llm_marketing_prompt, llm_marketing_prompt_response_json, llm_marketing_prompt_response_text)
                    VALUES ({item['customer_id']}, CURRENT_DATE(), \"\"\"{prompt}\"\"\", JSON\"\"\"{result_escaped_quotes}\"\"\",\"\"\"{json_result['marketing_text']}\"\"\")"""
        # print(sql)
        RunQuery(sql)

        # Jump out of loop
        item['marketing_text'] = json_result['marketing_text']
        item['email_subject'] = json_result['email_subject']
        success = True

        print("---------------------------------------------------------------------------------------")
        print(f"LLM Marketing Prompt [Success] for Customer {item['customer_id']}")
        print("---------------------------------------------------------------------------------------")

      except Exception as e:
        retry += 1
        print("---------------------------------------------------------------------------------------")
        print(f"LLM Marketing Prompt [SQL Error] for Customer {item['customer_id']}: {sql}")
        print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"LLM Marketing Prompt [Error] for Customer {item['customer_id']}: {e}")
      print("---------------------------------------------------------------------------------------")

    if retry > 5:
      print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
      print(f"LLM Marketing Prompt [Retry Limit Reached - Skipping] for Customer {item['customer_id']}")
      print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
      break # Skip this customer

In [None]:
# View Results
sql=f"""SELECT customer_id, llm_marketing_prompt, llm_marketing_prompt_response_json, llm_marketing_prompt_response_text
          FROM `chocolate_ai.customer_hyper_personalized_email`
         WHERE customer_id IN ({customer_id_list_str})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Create an image prompt and enhance it by running it through Gemini</font>

In [None]:
# For each customer, generate an improved image prompt using Gemini

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "customer_id" : 0,
#    "image_prompt" : "text"
#    "explanation" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "customer_id",
    "image_prompt",
    "explanation"
  ],
  "properties": {
    "customer_id": {
      "type": "integer"
    },
    "image_prompt": {
      "type": "string"
    },
    "explanation": {
      "type": "string"
    }
  }
}

for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  retry = 0
  success = False
  while not success:
    try:
      prompt = f"""You are a marketing expert at Chocolate A.I., a Paris-based chocolate, dessert, and coffee shop. Your task is to craft a highly personalized image for customer {item['customer_name']}.
      You need to send out a hyper-personalized email to a customer.
      Generate a LLM Prompt to generate a marketing image based upon the customer's profile and the marketing message we are sending in the email.
      The image needs to include one of the below top 3 menu items.
      We want the image to be specific to this customer and their interests.
      Think creatively and use the customer's interests to create a unique image.
      This is important to show something about their interests, hobbies, sports, etc.
      Do not include any names of professional sports teams since they are copyrighted.
      Make sure you state the image should be photo realistic.
      Avoid and copyrighted names such as sporting teams names.
      Avoid mentioning any celebrity names or specific people.
      Do not include references to kids or children in the image prompt.
      Only audits can be rendered by the image process.
      This this through step by step.
      Double check for kids, children, or copyrighted sports teams names.

      Customer's profile:
        customer_marketing_insights: {item['customer_marketing_insights']}
        top_1_favorite_menu_item_name: {item['top_1_favorite_menu_item_name']}
        top_2_favorite_menu_item_name: {item['top_2_favorite_menu_item_name']}
        top_3_favorite_menu_item_name: {item['top_3_favorite_menu_item_name']}

      Marketing Message:
       {item['marketing_text']}

      Do not tell the customer this, but there is an event in Paris that matches their hobbies so you should focus on an image that is specific to that event.
        event_id: {item['event_id']}
        event_title: {item['event_title']}
        event_description: {item['event_description']}
        event_keywords: {item['event_keywords']}
        """

      print(prompt)
      llm_result = GeminiLLM(prompt,response_schema=response_schema)
      print(llm_result)
      json_result = json.loads(llm_result)
      result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")

      # Save to database
      try:
        sql=f"""UPDATE `chocolate_ai.customer_hyper_personalized_email`
                   SET llm_orginial_image_prompt = \"\"\"{prompt}\"\"\",
                       llm_orginial_image_prompt_response_json = JSON\"\"\"{result_escaped_quotes}\"\"\",
                       llm_orginial_image_prompt_response_text = \"\"\"{json_result['image_prompt']}\"\"\",
                       llm_improved_image_prompt = \"\"\"{json_result['image_prompt']}\"\"\"
                 WHERE customer_id = {item['customer_id']}"""

        #print(sql)
        RunQuery(sql)

        # Jump out of loop
        item['image_prompt'] = json_result['image_prompt']
        item['image_explanation'] = json_result['explanation']
        success = True

        print("---------------------------------------------------------------------------------------")
        print(f"LLM Image Prompt [Success] for Customer {item['customer_id']}")
        print("---------------------------------------------------------------------------------------")

      except Exception as e:
        retry += 1
        print("---------------------------------------------------------------------------------------")
        print(f"LLM Image Prompt [SQL Error] for Customer {item['customer_id']}: {sql}")
        print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"LLM Image Prompt [Error] for Customer {item['customer_id']}: {e}")
      print("---------------------------------------------------------------------------------------")

    if retry > 5:
      print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
      print(f"LLM Image Prompt [Retry Limit Reached - Skipping] for Customer {item['customer_id']}")
      print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
      break # Skip this customer

In [None]:
# View Results
sql=f"""SELECT customer_id, llm_orginial_image_prompt, llm_orginial_image_prompt_response_json, llm_improved_image_prompt
          FROM `chocolate_ai.customer_hyper_personalized_email`
         WHERE customer_id IN ({customer_id_list_str})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Call Imagen3 with the updated prompt</font>

In [None]:
# For each customer, generate the image using Imagen3
for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  try:
    image_prompt = item['image_prompt']
    print(f"Image Prompt: {image_prompt}")
    print(f"Image Prompt Explanation: {item['image_explanation']}")
    filename = ImageGen(item['image_prompt'])

    img = Image.open(filename)
    img.thumbnail([500,500]) # width, height
    IPython.display.display(img)

    # Save image to GCS
    dest_filename = f"email_campaign_{item['customer_id']}.png"
    copy_file_to_gcs(filename, storage_account, f"chocolate-ai/Campaign-Assets-Hyper-Personalized-Email/email-{formatted_date}/{dest_filename}")
    item['gcs_filename'] = f"gs://{storage_account}/chocolate-ai/Campaign-Assets-Hyper-Personalized-Email/email-{formatted_date}/{dest_filename}"
    item['html_filename'] = f"https://storage.cloud.google.com/{storage_account}/chocolate-ai/Campaign-Assets-Hyper-Personalized-Email/email-{formatted_date}/{dest_filename}"

    # Update table in BigQuery
    try:
      sql=f"""UPDATE `chocolate_ai.customer_hyper_personalized_email`
                  SET image_gcs_filename = '{item['gcs_filename']}',
                      image_http_url = '{item['html_filename']}',
                      image_generated = TRUE
                WHERE customer_id = {item['customer_id']}"""

      #print(sql)
      RunQuery(sql)
      item['image_filename'] = filename

      print("---------------------------------------------------------------------------------------")
      print(f"Imagen3 [Success] for Customer {item['customer_id']}")
      print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"Imagen3 [SQL Error] for Customer {item['customer_id']}: {sql}")
      print("---------------------------------------------------------------------------------------")

  except Exception as e:
    print("---------------------------------------------------------------------------------------")
    print(f"Imagen3 [Error] for Customer {item['customer_id']}: {e}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# To view the bucket
print(f"https://console.cloud.google.com/storage/browser/{storage_account}/chocolate-ai/Campaign-Assets-Hyper-Personalized-Email/email-{formatted_date}")

In [None]:
# View Results
sql=f"""SELECT customer_id, image_gcs_filename, image_http_url, image_generated
          FROM `chocolate_ai.customer_hyper_personalized_email`
         WHERE customer_id IN ({customer_id_list_str})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Verify the Generated Image with Gemini Vision</font>

In [None]:
# Verify the generate image is correct

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "image_verified" : true
#    "explanation" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "image_verified",
    "explanation"
  ],
  "properties": {
    "image_verified": {
      "type": "boolean"
    },
    "explanation": {
      "type": "string"
    }
  }
}

json_schema = '{ "image_verified" : true, "explanation" : "text" }'

for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  try:
    image_prompt = item['image_prompt']
    print(f"Image Prompt: {image_prompt}")
    print(f"Image Prompt Explanation: {item['image_explanation']}")
    filename = item['image_filename']

    prompt = f"""I need you to verify that the below image meets the following criteria:
    <ImagePrompt>
    {item['image_prompt']}
    </ImagePrompt>
    <ImageExplanation>
    {item['image_explanation']}
    </ImageExplanation>
    """

    print(prompt)
    imageBase64 = convert_png_to_base64(filename)
    llm_result = GeminiLLM_VerifyImage(prompt, imageBase64, response_schema=response_schema)
    print(llm_result)
    json_result = json.loads(llm_result)
    result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")


    # Update table in BigQuery
    try:
      sql=f"""UPDATE `chocolate_ai.customer_hyper_personalized_email`
                  SET llm_verify_image_prompt = \"\"\"{prompt}\"\"\",
                      llm_verify_image_response_json = JSON\"\"\"{result_escaped_quotes}\"\"\",
                      llm_verify_image_text = \"\"\"{json_result['explanation']}\"\"\",
                      image_verified = {json_result['image_verified']}
                WHERE customer_id = {item['customer_id']}"""

      #print(sql)
      RunQuery(sql)

      print("---------------------------------------------------------------------------------------")
      print(f"Imagen3 Verification [Success] for Customer {item['customer_id']}")
      print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"Imagen3 Verification [SQL Error] for Customer {item['customer_id']}: {sql}")
      print("---------------------------------------------------------------------------------------")

  except Exception as e:
    print("---------------------------------------------------------------------------------------")
    print(f"Imagen3 Verification [Error] for Customer {item['customer_id']}: {e}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# View Results
sql=f"""SELECT customer_id, llm_verify_image_prompt, llm_verify_image_response_json, llm_verify_image_text, image_verified
          FROM `chocolate_ai.customer_hyper_personalized_email`
         WHERE customer_id IN ({customer_id_list_str})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Translate the Marketing Message to another language</font>

In [None]:
# Translate the marketing text into another language (we will randomly pick on)

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "translated_text" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "translated_text"
  ],
  "properties": {
    "translated_text": {
      "type": "string"
    }
  }
}


for item in customer_list:
  # Pick an random language from the list
  random_language = 11 # we will do French or you can do a Random language: random.randint(0,len(gemini_languages)-1)
  print(f"Random Language: {gemini_languages[random_language]}")
  print(f"Customer id: {item['customer_id']}")
  item['translation_language'] = gemini_languages[random_language]
  try:

    prompt = f"""You are an expert translator for the following language {gemini_languages[random_language]}.
    Translate the following text from English to {gemini_languages[random_language]}:
    <Text>
    {item['marketing_text']}
    </Text>
    """

    print(prompt)
    llm_result = GeminiLLM(prompt, response_schema=response_schema)
    print(llm_result)
    json_result = json.loads(llm_result)
    result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")

    # Update table in BigQuery
    try:
      sql=f"""UPDATE `chocolate_ai.customer_hyper_personalized_email`
                  SET llm_translation_language_prompt = \"\"\"{prompt}\"\"\",
                      llm_translation_language_prompt_response_json = JSON\"\"\"{result_escaped_quotes}\"\"\",
                      llm_translation_language_prompt_response_text = \"\"\"{json_result['translated_text']}\"\"\"
                WHERE customer_id = {item['customer_id']}"""

      #print(sql)
      RunQuery(sql)
      item['translated_text'] = json_result['translated_text']

      print("---------------------------------------------------------------------------------------")
      print(f"Translation [Success] for Customer {item['customer_id']}")
      print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"Translation [SQL Error] for Customer {item['customer_id']}: {sql}")
      print("---------------------------------------------------------------------------------------")

  except Exception as e:
    print("---------------------------------------------------------------------------------------")
    print(f"Translation [Error] for Customer {item['customer_id']}: {e}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# View Results
sql=f"""SELECT customer_id, llm_translation_language_prompt, llm_translation_language_prompt_response_json, llm_translation_language_prompt_response_text
          FROM `chocolate_ai.customer_hyper_personalized_email`
         WHERE customer_id IN ({customer_id_list_str})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Verify the Translation</font>

In [None]:
# Verify the translation is correct

# Write me the json in  OpenAPI 3.0 schema object for the below object.
# Make all fields required.
#  {
#    "translation_verified" : true
#    "explanation" : "text"
#  }
response_schema = {
  "type": "object",
  "required": [
    "translation_verified",
    "explanation"
  ],
  "properties": {
    "translation_verified": {
      "type": "boolean"
    },
    "explanation": {
      "type": "string"
    }
  }
}


for item in customer_list:
  print(f"Customer id: {item['customer_id']}")
  try:
    prompt = f"""I need you to verify that the below text is in the langugage of "{item['translation_language']}".
    It was originially in English, so make sure it is not still English.
    <Text>
    {item['translated_text']}
    </Text>
    """

    print(prompt)
    llm_result = GeminiLLM(prompt, response_schema=response_schema)
    print(llm_result)
    json_result = json.loads(llm_result)
    result_escaped_quotes = llm_result.replace("\\","\\\\").replace("'","\'")

    # Update table in BigQuery
    try:
      sql=f"""UPDATE `chocolate_ai.customer_hyper_personalized_email`
                  SET llm_validate_translation_prompt = \"\"\"{prompt}\"\"\",
                      llm_validate_translation_prompt_response_json = JSON\"\"\"{result_escaped_quotes}\"\"\",
                      llm_validate_translation_prompt_response_text = \"\"\"{json_result['explanation']}\"\"\",
                      translation_verified = {json_result['translation_verified']}
                WHERE customer_id = {item['customer_id']}"""

      #print(sql)
      RunQuery(sql)

      print("---------------------------------------------------------------------------------------")
      print(f"Translation Verification [Success] for Customer {item['customer_id']}")
      print("---------------------------------------------------------------------------------------")

    except Exception as e:
      retry += 1
      print("---------------------------------------------------------------------------------------")
      print(f"Translation Verification [SQL Error] for Customer {item['customer_id']}: {sql}")
      print("---------------------------------------------------------------------------------------")

  except Exception as e:
    print("---------------------------------------------------------------------------------------")
    print(f"Translation Verification [Error] for Customer {item['customer_id']}: {e}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# View Results
sql=f"""SELECT customer_id, llm_validate_translation_prompt, llm_validate_translation_prompt_response_json, llm_validate_translation_prompt_response_text, translation_verified
          FROM `chocolate_ai.customer_hyper_personalized_email`
         WHERE customer_id IN ({customer_id_list_str})
           AND email_date = CURRENT_DATE()"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Generate the HTML and Save</font>

In [None]:
html_template = """<!DOCTYPE html>
<html>
<head>
  <title>Coffee Campaign</title>
  <style>
    body {
      font-family: 'Helvetica Neue', sans-serif;
    }
    .email-campaign {
      background-color: #EDF2F9;
      padding: 20px;
      margin-bottom: 20px;
      border-bottom: 2px solid #ddd;
    }
    h3 {
      font-size: 16px;
      margin-bottom: 10px;
      color: #333;
    }
    p {
      font-size: 14px;
      line-height: 1.5;
      color: #555;
    }
  </style>
</head>
<body>
  <div class="email-campaign">
    <h3>Email Campaign (English)</h3>
    <p style="font-weight: bold;">Subject: ##email_subject##</p>
    <p>##marketing_text##</p>
  </div>
  <div>
    <img src="##html_filename##" width="500" height="500" alt="Item Image">
  </div>

  <hr/>

  <div class="email-campaign">
    <h3>Email Campaign (##translation_language##)</h3>
    <p>##translated_text##</p>
  </div>
  <div>
    <img src="##html_filename##" width="500" height="500" alt="Item Image">
  </div>

</body>
</html>
"""

In [None]:
# Create HTML using the Template

for item in customer_list:
  print(item)
  if 'html_filename' not in item:
    # Error generating image
    print("Error: 'html_filename' not in item")
    continue

  if 'translated_text' not in item:
    # Error generating translation
    print("Error: 'translated_text' not in item")
    continue

  # Replace the placeholders with the actual values
  html = html_template \
    .replace("##email_subject##", item['email_subject']) \
    .replace("##marketing_text##", item['marketing_text']) \
    .replace("##translation_language##", item['translation_language']) \
    .replace("##translated_text##", item['translated_text']) \
    .replace("##html_filename##", item['html_filename'])

  filename = f"email_campaign_{item['customer_id']}.html"

  # Save the HTML to a file
  with open(filename, "w") as f:
    f.write(html)

  copy_file_to_gcs(filename, storage_account,f"chocolate-ai/Campaign-Assets-Hyper-Personalized-Email/email-{formatted_date}/" + filename)

  # Update table in BigQuery
  try:
    html_gcs_filename = f"gs://{storage_account}/chocolate-ai/Campaign-Assets-Hyper-Personalized-Email/email-{formatted_date}/{filename}"
    html_http_url = f"https://storage.cloud.google.com/{storage_account}/chocolate-ai/Campaign-Assets-Hyper-Personalized-Email/email-{formatted_date}/{filename}"

    sql=f"""UPDATE `chocolate_ai.customer_hyper_personalized_email`
                SET html_gcs_filename = '{html_gcs_filename}',
                    html_http_url = '{html_http_url}',
                    html_generated = TRUE
              WHERE customer_id = {item['customer_id']}"""

    #print(sql)
    RunQuery(sql)

    print("---------------------------------------------------------------------------------------")
    print(f"HTML Generation [Success] for Customer {item['customer_id']}")
    print("---------------------------------------------------------------------------------------")

  except Exception as e:
    retry += 1
    print("---------------------------------------------------------------------------------------")
    print(f"HTML Generation [SQL Error] for Customer {item['customer_id']}: {sql}")
    print("---------------------------------------------------------------------------------------")


In [None]:
# To view the bucket
print(f"https://console.cloud.google.com/storage/browser/{storage_account}/chocolate-ai/Campaign-Assets-Hyper-Personalized-Email/email-{formatted_date}")

In [None]:
# View Results
sql=f"""SELECT customer_id, html_gcs_filename, html_http_url, html_generated
          FROM `chocolate_ai.customer_hyper_personalized_email`
         WHERE customer_id IN ({customer_id_list_str})
           AND email_date = CURRENT_DATE();"""

print(sql)
df_process = RunQuery(sql)
df_process

In [None]:
filename = f"email_campaign_{customer_list[0]['customer_id']}.html"
IPython.display.HTML(filename=filename)

In [None]:
filename = f"email_campaign_{customer_list[1]['customer_id']}.html"
IPython.display.HTML(filename=filename)

### <font color='#4285f4'>View all results</font>

In [None]:
# View All Results
sql=f"""SELECT *
          FROM `chocolate_ai.customer_hyper_personalized_email`
         WHERE customer_id IN ({customer_id_list_str})
           AND email_date = CURRENT_DATE();"""

print(sql)
df_process = RunQuery(sql)
df_process

### <font color='#4285f4'>Clean Up</font>

In [None]:
# Placeholder

### <font color='#4285f4'>Reference Links</font>


- [Imagen3](https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/image-generation)