## <img src="https://lh3.googleusercontent.com/mUTbNK32c_DTSNrhqETT5aQJYFKok2HB1G2nk2MZHvG5bSs0v_lmDm_ArW7rgd6SDGHXo0Ak2uFFU96X6Xd0GQ=w160-h128" width="45" valign="top" alt="BigQuery"> Generating A/B Menu Marketing Campaign using Gemini Pro


### License

In [None]:
##################################################################################
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###################################################################################

### Notebook Overview

- This notebook will create new a marketing campaign for newly generated items in the menu_a_b_testing table.  You can run the "Menu-A-B-Testing-Generate-Insight-GenAI" notebook first if you like.

- Notebook Logic:
    1. Create a prompt for our marketing campaign and store it in the table.
    2. Run Gemini Pro on the prompt and save the results.
    3. Parse the embedded JSON out of the respose and save as JSON
    4. Create an HTML message with different sections
    5. Update the results to GCS and update the table.

## Initialize Python

In [None]:
project_id="${project_id}"
location="us-central1"
model_id = "imagegeneration@005"

# No need to set these
city_names=["New York City", "London", "Tokyo", "San Francisco"]
city_ids=[1,2,3,4]
city_languages=["American English", "British English", "Japanese", "American English"]
number_of_coffee_trucks = "4"

dataset_id = "data_beans_synthetic_data"

gcs_storage_bucket = "${data_beans_curated_bucket}"
gcs_storage_path = "data-beans/menu-images-a-b-testing/"

In [None]:
from PIL import Image
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import time

from google.cloud import bigquery
client = bigquery.Client()

## ImageGen2 / Gemini Pro / Gemini Pro Vision (Helper Functions)

#### ImageGen2

In [None]:
def ImageGen(prompt):
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/image-generation
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project}/locations/{location}/publishers/google/models/imagegeneration:predict"

  payload = {
    "instances": [
      {
        "prompt": prompt
      }
    ],
    "parameters": {
      "sampleCount": 1
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
    image_data = base64.b64decode(image_data)
    filename= str(uuid.uuid4()) + ".png"
    with open(filename, "wb") as f:
      f.write(image_data)
    print(f"Image generated OK.")
    return filename
  else:
    error = f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'"
    raise RuntimeError(error)

#### Gemini Pro LLM

In [None]:
def GeminiProLLM(prompt, temperature = .8, topP = .8, topK = 40):

  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/gemini
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.0-flash:streamGenerateContent"

  payload = {
    "contents": {
      "role": "user",
      "parts": {
          "text": prompt
      },
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    },
    "generation_config": {
      "temperature": temperature,
      "topP": topP,
      "topK": topK,
      "maxOutputTokens": 8192,
      "candidateCount": 1
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    json_response = json.loads(response.content)
    llm_response = ""
    for item in json_response:
      try:
        llm_response = llm_response + item["candidates"][0]["content"]["parts"][0]["text"]
      except Exception as err:
        print(f"response.content: {response.content}")
        raise RuntimeError(err)

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")

    # print(f"llm_response:\n{llm_response}")
    return llm_response
  else:
    error = f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'"
    raise RuntimeError(error)


#### Gemini Pro Vision LLM

In [None]:
# Use the Gemini with Vision
def GeminiProVisionLLM(prompt, imageBase64, temperature = .4, topP = 1, topK = 32):

  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/gemini
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.0-flash:streamGenerateContent"

  payload = {
  "contents": [
      {
        "role": "user",
        "parts": [
          {
            "text": prompt
          },
          {
            "inlineData": {
              "mimeType": "image/png",
              "data": f"{imageBase64}"
            }
          }
        ]
      }
    ],
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    },
    "generation_config": {
      "temperature": temperature,
      "topP": topP,
      "topK": topK,
      "maxOutputTokens": 2048,
      "candidateCount": 1
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    json_response = json.loads(response.content)
    llm_response = ""
    for item in json_response:
      llm_response = llm_response + item["candidates"][0]["content"]["parts"][0]["text"]

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")

    # print(f"llm_response:\n{llm_response}")
    return llm_response
  else:
    error = f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'"
    raise RuntimeError(error)


In [None]:
# Use the Gemini with Vision
def GeminiProVisionMultipleFileLLM(prompt, image_prompt, temperature = .4, topP = 1, topK = 32):
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/gemini
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/gemini-2.0-flash:streamGenerateContent"


  parts = []
  new_item = {
      "text": prompt
      }
  parts.append(new_item)

  for item in image_prompt:
    new_item = {
        "text": f"Image Name: {item['llm_image_filename']}:\n"
        }
    parts.append(new_item)
    new_item = {
        "inlineData": {
            "mimeType": "image/png",
            "data": item["llm_image_base64"]
            }
        }
    parts.append(new_item)

  payload = {
  "contents": [
      {
        "role": "user",
        "parts": parts
      }
    ],
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    },
    "generation_config": {
      "temperature": temperature,
      "topP": topP,
      "topK": topK,
      "maxOutputTokens": 2048,
      "candidateCount": 1
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    json_response = json.loads(response.content)
    llm_response = ""
    for item in json_response:
      llm_response = llm_response + item["candidates"][0]["content"]["parts"][0]["text"]

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")

    # print(f"llm_response:\n{llm_response}")
    return llm_response
  else:
    error = f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'"
    raise RuntimeError(error)


#### SQL Functions

In [None]:
def RunQuery(sql):
  import time

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      return False

In [None]:
def GetNextPrimaryKey(fully_qualified_table_name, field_name):
  sql = f"""
  SELECT IFNULL(MAX({field_name}),0) AS result
    FROM `{fully_qualified_table_name}`
  """
  # print(sql)
  df_result = client.query(sql).to_dataframe()
  # display(df_result)
  return df_result['result'].iloc[0] + 1

In [None]:
def GetTableSchema(dataset_name, table_name):
  import io

  dataset_ref = client.dataset(dataset_name, project=project_id)
  table_ref = dataset_ref.table(table_name)
  table = client.get_table(table_ref)

  f = io.StringIO("")
  client.schema_to_json(table.schema, f)
  return f.getvalue()

In [None]:
def GetStartingValue(dataset_name, table_name, field_name):
  sql = f"""
  SELECT IFNULL(MAX({field_name}),0) + 1 AS result
    FROM `{project_id}.{dataset_name}.{table_name}`
  """
  #print(sql)
  df_result = client.query(sql).to_dataframe()
  #display(df_result)
  return df_result['result'].iloc[0]

In [None]:
def GetForeignKeys(dataset_name, table_name, field_name):
  sql = f"""
  SELECT STRING_AGG(CAST({field_name} AS STRING), "," ORDER BY {field_name}) AS result
    FROM `{project_id}.{dataset_name}.{table_name}`
  """
  #print(sql)
  df_result = client.query(sql).to_dataframe()
  #display(df_result)
  return df_result['result'].iloc[0]

In [None]:
def GetDistinctValues(dataset_name, table_name, field_name):
  sql = f"""
  SELECT STRING_AGG(DISTINCT {field_name}, "," ) AS result
    FROM `{project_id}.{dataset_name}.{table_name}`
  """
  #print(sql)
  df_result = client.query(sql).to_dataframe()
  #display(df_result)
  return df_result['result'].iloc[0]

#### Helper Functions

In [None]:
def convert_png_to_base64(image_path):
  image = cv2.imread(image_path)

  # Convert the image to a base64 string.
  _, buffer = cv2.imencode('.png', image)
  base64_string = base64.b64encode(buffer).decode('utf-8')

  return base64_string

In [None]:
# This was generated by GenAI

def copy_file_to_gcs(local_file_path, bucket_name, destination_blob_name):
  """Copies a file from a local drive to a GCS bucket.

  Args:
      local_file_path: The full path to the local file.
      bucket_name: The name of the GCS bucket to upload to.
      destination_blob_name: The desired name of the uploaded file in the bucket.

  Returns:
      None
  """

  import os
  from google.cloud import storage

  # Ensure the file exists locally
  if not os.path.exists(local_file_path):
      raise FileNotFoundError(f"Local file '{local_file_path}' not found.")

  # Create a storage client
  storage_client = storage.Client()

  # Get a reference to the bucket
  bucket = storage_client.bucket(bucket_name)

  # Create a blob object with the desired destination path
  blob = bucket.blob(destination_blob_name)

  # Upload the file from the local filesystem
  content_type = ""
  if local_file_path.endswith(".html"):
    content_type = "text/html; charset=utf-8"

  if local_file_path.endswith(".json"):
    content_type = "application/json; charset=utf-8"

  if content_type == "":
    blob.upload_from_filename(local_file_path)
  else:
    blob.upload_from_filename(local_file_path, content_type = content_type)

  print(f"File '{local_file_path}' uploaded to GCS bucket '{bucket_name}' as '{destination_blob_name}.  Content-Type: {content_type}'.")

In [None]:
def download_from_gcs(filename, gcs_storage_bucket, gcs_storage_path):
  # prompt: Write python code to download a blob from a gcs bucket.  do not use the requests method

  from google.cloud import storage

  # The ID of your GCS object
  object_name = gcs_storage_path + filename

  # The path to which the file should be downloaded
  destination_file_name = filename

  storage_client = storage.Client()

  bucket = storage_client.bucket(gcs_storage_bucket)

  # Construct a client side representation of a blob.
  # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
  # any content from Google Cloud Storage. As we don't need additional data,
  # using `Bucket.blob` is preferred here.
  blob = bucket.blob(object_name)
  blob.download_to_filename(destination_file_name)

  print(
      "Downloaded storage object {} from bucket {} to local file {}.".format(
          object_name, gcs_storage_bucket, destination_file_name
      )
  )

## Create the Marketing Campaign

In [None]:
%%bigquery

-- Copy the table from the curated dataset if the Menu-A-B-Testing-Generate-Insight-GenAI notebook has not been run.

CREATE SCHEMA IF NOT EXISTS `${project_id}.data_beans_synthetic_data`;
CREATE TABLE IF NOT EXISTS `${project_id}.data_beans_synthetic_data.menu_a_b_testing` COPY `${project_id}.data_beans_curated.menu_a_b_testing`;

In [None]:
%%bigquery

-- Create our marketing prompt for Email, Twitter (X), and Instragram

UPDATE `${project_id}.data_beans_synthetic_data.menu_a_b_testing` AS parent
   SET llm_marketing_prompt =
       CONCAT("You run a fleet of coffee trucks in ", city.city_name, ". ",
       "We need to craft a compelling marketing campaign for a new menu item. ",
       "Craft 3 sections for the campaign: A Twitter section, An Instragram section and an Email section. ",
       "The new menu item's name is: ", child.item_name, ". ",
       "The new menu item's description is: ", IFNULL(child.item_description,child.item_name), ", ",
       "The new menu item's image looks like: ", child.llm_item_image_prompt, ". ",
       "Embrace unconventional ideas and thinking that surprises and inspires unique variations. ",
       "Write the response in using the language ",
       CASE WHEN city.city_id = 1 THEN "American English"
            WHEN city.city_id = 2 THEN "British English"
            WHEN city.city_id = 3 THEN "Japanese"
            WHEN city.city_id = 4 THEN "American English"
            ELSE "English"
       END, ". ",
       "The campaign should be 3 to 4 sentences. ",
       "Do not mention the weather in the message. ",
       "Do not mention 'Coming Soon' since the menu item is available now. ",
       "Do not create any token replacements like square brackets or curly braces. ",
       "This is not a template it is a complete message. ",
       "Return the results in JSON with no special characters or formatting. ",
       "Double check for special characters especially for Japanese. ",
       'Create a subject up to 150 characters and place in the JSON "subject" field. ',
       'Place the twitter text in the JSON "twitter" field. ',
       'Place the instagram text in the JSON "instagram" field. ',
       'Place the email text in the JSON "email" field. ',
       "Example Return Data:",
       '{ "subject" : "Wow! Mocha Munchkins!, "twitter": "Mocha Munchkins get some before they are gone.", "instagram" : "Check out this photo", "email" : "Check this out"} ',
       '{ "subject" : "Robo Coffee a Must!", "twitter": "Try our all new Robo coffee.  Available Now.", "instagram" : "Wow new item", "email" : "You must taste this"} '
       ),
       llm_marketing_response = null,
       llm_marketing_parsed_response = null,
       html_filename = null,
       html_url = null,
       html_generated = FALSE
  FROM `${project_id}.data_beans_synthetic_data.menu_a_b_testing` AS child
       INNER JOIN `${project_id}.data_beans_curated.location` AS location
               ON child.location_id = location.location_id
       INNER JOIN `${project_id}.data_beans_curated.city` AS city
               ON location.city_id = city.city_id
 WHERE child.menu_a_b_testing_id = parent.menu_a_b_testing_id
   AND parent.llm_marketing_prompt IS NULL;


In [None]:
%%bigquery

-- Score our LLM prompt
UPDATE `${project_id}.data_beans_synthetic_data.menu_a_b_testing` AS menu_a_b_testing
   SET llm_marketing_response = llm_query.ml_generate_text_result,
       llm_marketing_parsed_response = NULL
  FROM (SELECT *
          FROM ML.GENERATE_TEXT(MODEL `${project_id}.data_beans_curated.gemini_model`,
              (SELECT menu_a_b_testing_id,
                      llm_marketing_prompt AS prompt
                 FROM `${project_id}.data_beans_synthetic_data.menu_a_b_testing`
                WHERE (llm_marketing_response IS NULL
                       OR
                       JSON_VALUE(llm_marketing_response, '$.candidates[0].content.parts[0].text') IS NULL
                       )
                ),
               STRUCT(
               .9 AS temperature,
               5000 AS max_output_tokens,
               .8 AS top_p,
               30 AS top_k
           ))
      ) AS llm_query
WHERE menu_a_b_testing.menu_a_b_testing_id = llm_query.menu_a_b_testing_id;


In [None]:
%%bigquery

-- Parse our custom JSON out of the LLM response JSON
UPDATE `${project_id}.data_beans_synthetic_data.menu_a_b_testing`
   SET llm_marketing_parsed_response = `${project_id}.data_beans_curated.gemini_model_result_as_json`(llm_marketing_response)
 WHERE llm_marketing_parsed_response IS NULL;

In [None]:
%%bigquery

-- Show the results
SELECT menu_a_b_testing_id, llm_marketing_parsed_response
 FROM `${project_id}.data_beans_synthetic_data.menu_a_b_testing`
ORDER BY menu_a_b_testing_id DESC
LIMIT 10;


## Create the HTML message

In [None]:
sql = """SELECT menu_a_b_testing_id, llm_marketing_parsed_response
           FROM `${project_id}.data_beans_synthetic_data.menu_a_b_testing`
         WHERE html_generated IS NULL OR html_generated = FALSE
        ORDER BY menu_a_b_testing_id
"""

df_process = client.query(sql).to_dataframe()

for row in df_process.itertuples():
  menu_a_b_testing_id = row.menu_a_b_testing_id
  llm_marketing_parsed_response = row.llm_marketing_parsed_response

  print(f"menu_a_b_testing_id: {menu_a_b_testing_id}")
  print(f"llm_marketing_parsed_response: {llm_marketing_parsed_response}")

  prompt=f"""Convert the below text into a well-structured HTML document.
The document should have an email section, a Twitter section called "X" and an Instagram section.
Create a header for each section: Email Campaign, X Campaign, Instagram Campaign.
The Email Campaign section should use the color "#EDF2F9" for the background.
The X Campaign section should use the color White for the background.
The Instagram Campaign section should use the color "#EDF2F9" for the background.
Seperate each Campaign section with space and a line.
Refrain from using <h1>, <h2>, <h3>, and <h4> tags.
Do not create a template that uses square or curly braces, use the actual text.
Create inline styles.
Make the styles fun, eye catching and exciting.
Use "Helvetica Neue" as the font.
All text should be left aligned.
Avoid fonts larger than 16 pixels.
Do not change the language.  Keep the text in the native language.

Include the following image in the html at the very botton in its own section:
- The image is located at: https://REPLACE_ME
- The image url should have a "width" of 500 and "height" of 500.

Double check that you did not use any <h1>, <h2>, <h3>, or <h4> tags.

Text:
{llm_marketing_parsed_response}
"""

  print(f"prompt: {prompt}")


  # The LLM can generate some HTML tags that can cause issues when parsing the result
  llm_success = False
  temperature = .5

  while llm_success == False:
    try:
      llm_response = GeminiProLLM(prompt, temperature=temperature, topP=.5, topK = 20)

      if llm_response.startswith("html"):
        llm_response = llm_response[4:] # incase the response is formatted like markdown

      # download file
      filename = str(menu_a_b_testing_id) + ".png"
      download_from_gcs(filename, gcs_storage_bucket, gcs_storage_path)

      # open locally
      imageBase64 = convert_png_to_base64(filename)

      # Replace the image with an inline image (this avoids a SignedURL or public access to GCS bucket)
      html_text = llm_response.replace("https://REPLACE_ME", f"data:image/png;base64, {imageBase64}")

      filename= str(menu_a_b_testing_id) + ".html"
      with open(filename, "w", encoding='utf8') as f:
        f.write(html_text)
      print ("")

      copy_file_to_gcs(filename, gcs_storage_bucket, gcs_storage_path + filename)

      sql = "UPDATE `${project_id}.data_beans_synthetic_data.menu_a_b_testing` " + \
              "SET html_generated = TRUE, " + \
                  "html_filename = '" + "gs://${project_id}/data-beans/menu-images-a-b-testing/" + filename + "', " \
                  "html_url = '" + "https://storage.cloud.google.com/${data_beans_curated_bucket}/data-beans/menu-images-a-b-testing/" + filename + "' " \
            "WHERE menu_a_b_testing_id = " + str(menu_a_b_testing_id)

      RunQuery(sql)

      llm_success = True

    except:
      # Reduce the temperature for more accurate generation
      temperature = temperature - .05
      print("Regenerating...")
      if temperature <= 0:
        llm_success = True

  # break


In [None]:
# You can hardcode one if you like: e.g. "1.html"
HTML(filename)