## <font color='#4285f4'>Overview</font>

This notebook demonstrates a process for generating marketing campaign recommendations for Chocolate AI, a fictional French chocolatier and bakery. It leverages BigQuery for data storage and analysis, Gemini (a large language model) for creative content generation, and TimesFM (a time series forecasting model) for predicting campaign performance. Informed by your existing data, Gen AI can create powerful campaign recommendations to jumpstart your marketing team’s efforts. Marketing projects that previously took weeks to plan and brainstorm can be rapidly prototyped using Gen AI.

Process Flow:
1. Define Campaign Recommendation Functions:
    - GenerateCampaignRecommendations: Uses Gemini LLM to suggest campaign ideas based on provided goals, product information, and budget.
    - GetRankedProducts: Queries BigQuery to identify under-performing products based on sales changes over various time periods (daily, weekly, monthly).
    - GetFinalProductRanking: Combines rankings from different time periods to produce a final list of products to consider for promotion.
    - Visualize*: Several functions to create informative graphs for visualizing product rankings and campaign performance.
    - GenerateCampaignPerformance: Simulates randomized campaign performance data influenced by budget.
2. Campaign Generation Workflow:
    - Step 1: Product Ranking:
        - Retrieves and visualizes rankings of the lowest-performing products over different time periods.
        - Combines these rankings into a final ranked list.
        - Uploads the visualization to Google Cloud Storage and uses Gemini (with multi-modal input) to interpret the graph and suggest products for promotion.
    - Step 2: Discount Optimization:
        - Based on the selected product, uses TimesFM to simulate the impact of different discount levels on sales, recommending a discount to achieve the desired sales increase.
    - Step 3: Campaign Ideation:
        - Fetches product details from BigQuery.
        - Calls GenerateCampaignRecommendations to create campaign ideas, including name, description, target audience, marketing channels, and expected outcomes.
    - Step 4: Campaign Performance Simulation:
        - Simulates the campaign's performance multiple times using GenerateCampaignPerformance to visualize potential outcomes.
    - Step 5: Mock Campaign Execution and Results:
        - Runs a final simulation to represent actual campaign results.
        - Visualizes the performance of the simulated campaign.
        - Saves the campaign details and simulated performance data into the BigQuery tables.

Author: Paul Ramsey

## <font color='#4285f4'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

## <font color='#4285f4'>Deploy TimesFM</font>

1. Open Vertex Model Garden
   -  https://console.cloud.google.com/vertex-ai/publishers/google/model-garden/timesfm
2. Click the Deploy button
3. Select
   - Resource Id:  google/timesfm-v20240828
   - Model Name: (leave default - name does not matter)   
   - Endpoint name: (leave default - name does not matter)
   - Region: us-central1 (if you change you need to change the **Initialize** variables below)
   - Machine spec: (leave default - n1-standard-8)
4. Click Deploy
5. Wait 20-45 minutes
6. Open Vertex Model Registry
   - https://console.cloud.google.com/vertex-ai/models
7. Click on the model name
8. Click on the model name under "Deploy your model"
9. Click on "Sample Request" (at the top)
10. Copy the endpoint id (i.e. ```ENDPOINT_ID="8770814150373801984"```)
11. Update the variable endpoint_id in the **Initialize** code below.



##### TimesFM Deployment Video

[![TimesFM Deployment Video](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/adam-paternostro-video.png)](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Performance-Forecasting-TimesFM-Install.mp4)

In [None]:
from IPython.display import HTML

HTML("""
<h2>Deploying TimesFM to a Vertex AI Endpoint Instructions</h2>
<video width="800" height="600" controls>
  <source src="https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Performance-Forecasting-TimesFM-Install.mp4" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")

## <font color='#4285f4'>Initialize</font>

### Imports

NOTE: The timesfm install and import takes about 5 minutes for the initial run and may require a restart of the kernel.

In [None]:
from IPython.display import HTML
import google.auth
import requests
import json
import random
from datetime import datetime
import numpy as np
import logging
import matplotlib.pyplot as plt
import pandas as pd
import statistics
from io import BytesIO
import markdown
import inspect

from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception
from google.cloud import bigquery
client = bigquery.Client()

In [None]:
endpoint_id="2481537270750904320"  # <- YOU MUST SET THIS !!!!

# Update endpoint_id="000000000000000000"  # <- YOU MUST SET THIS !!!!these variables to match your environment
bigquery_location = "${bigquery_location}"
region = "${region}"
location = "${location}"
storage_account = "${chocolate_ai_bucket}"

### Do not change the values in this cell below this line ###
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

bucket_name = "${chocolate_ai_bucket}"

print(f"project_id = {project_id}")
print(f"user = {user}")
print(f"location = {location}")
print(f"bigquery_location = {bigquery_location}")
print(f"bucket_name = {bucket_name}")

### Define Helper Methods

#### RetryCondition(error)

In [None]:
def RetryCondition(error):
  error_string = str(error)
  print(error_string)

  retry_errors = [
      "RESOURCE_EXHAUSTED",
      "No content in candidate",
      # Add more error messages here as needed
  ]

  for retry_error in retry_errors:
    if retry_error in error_string:
      print("Retrying...")
      return True

  return False

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### Gemini LLM (Pro 1.0 , Pro 1.5)

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM(prompt, model = "gemini-2.0-flash", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models
  # model = "gemini-2.0-flash"

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add inthe response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-2.0-flash":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": {
          "text": prompt
      },
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

#### Gemini LLM - Multimodal

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM_Multimodal(
    text_prompt, image_uri, model="gemini-2.0-flash", response_schema=None, temperature=1, topP=1, topK=32
):
    """
    Calls the Gemini API with a text prompt and an image prompt.

    Args:
      text_prompt: The text prompt.
      image_data: A BytesIO object containing the image data.
      model: The Gemini model to use.
      response_schema: Optional response schema.
      temperature: Temperature parameter for the model.
      topP: Top-p parameter for the model.
      topK: Top-k parameter for the model.

    Returns:
      The Gemini response as a string.
    """

    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request()
    creds.refresh(auth_req)
    access_token = creds.token

    headers = {
        "Content-Type": "application/json",
        "Authorization": "Bearer " + access_token
    }

    url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"


    generation_config = {
        "temperature": temperature,
        "topP": topP,
        "maxOutputTokens": 8192,
        "candidateCount": 1,
        #"responseMimeType": "text/html",  Invalid for multi-modal responses
    }

    if response_schema is not None:
        generation_config["responseSchema"] = response_schema

    if model in ["gemini-2.0-flash"]:
        generation_config["topK"] = topK

    # Construct the payload with the image URI
    payload = {
        "contents": [{
            "role": "user",
            "parts": [
                {
                    "text": text_prompt
                },
                {
                    "fileData": {
                        "fileUri": image_uri,
                        "mimeType": "image/png"
                    }
                }
            ]
        }],
        "generation_config": generation_config,
        "safety_settings": {
            "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
            "threshold": "BLOCK_LOW_AND_ABOVE"
        }
    }

    response = requests.post(url, json=payload, headers=headers)

    if response.status_code == 200:
        return json.loads(response.text)
    else:
        raise RuntimeError(
            f"Error with prompt:'{text_prompt}'  Status:'{response.status_code}' Text:'{response.text}'"
        )


#### RunQuery()

In [None]:
def RunQuery(sql, job_config = None):
  import time

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    if job_config == None:
      job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      return False

#### PrettyPrintJson(json_string)

In [None]:
def PrettyPrintJson(json_string):
  json_object = json.loads(json_string)
  json_formatted_str = json.dumps(json_object, indent=2)
  print(json_formatted_str)
  return json.dumps(json_object)

#### UploadImageToGcs()

In [None]:
from google.cloud import storage

def UploadImageToGcs(image_data, bucket_name, image_name):
    """Uploads an image to Google Cloud Storage."""

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(image_name)
    blob.upload_from_file(image_data, content_type='image/png')
    return f"gs://{bucket_name}/{image_name}"

#### DisplayMarkdown()

In [None]:
def DisplayMarkdown(text):
  """
  Displays text in markdown/HTML format in a Colab notebook.

  Args:
    text: The text to display. Can be plain text or Markdown.
  """

  formatted_text = markdown.markdown(text)  # Convert to HTML if necessary
  display(HTML(formatted_text))


#### getProjectNumber()

In [None]:
def getProjectNumber(project_id):
  """Batch activates service apis"""

  url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}"
  json_result = restAPIHelper(url, "GET", None)

  project_number = json_result["projectNumber"]
  return project_number

#### timesFMInference()

In [None]:
def timesFMInference(project_number, endpoint_id, payload):
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_number}/locations/{location}/endpoints/{endpoint_id}:predict"
  # print(f"url: {url}")
  response = restAPIHelper(url, http_verb="POST", request_body=payload)
  # print(f"response: {response}")
  return response

## <font color='#4285f4'>Table Definitions</font>

This notebook leverages the tables and data created by notebook `DB-GMA-Synthetic-Data-Generation-Campaigns.ipynb`. The table definitions are provided below for reference.

```sql
CREATE TABLE IF NOT EXISTS `${project_id}.${bigquery_chocolate_ai_dataset}.campaign` (
    campaign_id STRING NOT NULL OPTIONS(description="Unique identifier for each campaign (e.g., 'SummerSale2024', 'HolidayPromoDec')"),
    menu_id INT64 OPTIONS(description="Id of the menu item being promoted in the campaign"),
    campaign_name STRING OPTIONS(description="A descriptive name for the campaign."),
    campaign_goal STRING OPTIONS(description="The objective of the campaign (e.g., 'Increase brand awareness', 'Drive website traffic', 'Generate leads')"),
    campaign_start_date DATE OPTIONS(description="The date the campaign started."),
    campaign_end_date DATE OPTIONS(description="The date the campaign ended."),
    target_audience STRING OPTIONS(description="The intended audience for the campaign (e.g., 'Women aged 25-35', 'Small business owners in Texas')"),
    marketing_channels STRING OPTIONS(description="The marketing channels used in the campaign (e.g., 'Social Media', 'Email', 'SEO', 'Paid Ads')"),
    budget FLOAT64 OPTIONS(description="The overall budget allocated for the campaign."),
    explanation STRING OPTIONS(description="The model reasoning for creating the campaign."),
    campaign_outcomes STRING OPTIONS(description="Expected outcomes of the campaign."),
) CLUSTER BY campaign_id;
```

```sql
CREATE TABLE IF NOT EXISTS `${project_id}.${bigquery_chocolate_ai_dataset}.campaign_performance` (
    campaign_id STRING NOT NULL OPTIONS(description="Foreign key referencing the campaign table. Unique identifier for the campaign."),
    impressions INT64 OPTIONS(description="Number of times the campaign was displayed."),
    reach INT64 OPTIONS(description="Number of unique individuals exposed to the campaign."),
    website_traffic INT64 OPTIONS(description="Visits to your website attributed to the campaign."),
    leads_generated INT64 OPTIONS(description="Number of new leads captured (e.g., email sign-ups, contact form submissions)."),
    conversions INT64 OPTIONS(description="Number of desired actions taken (e.g., purchases, downloads)."),
    cost_per_click FLOAT64 OPTIONS(description="Average cost per click on your ads (if applicable)."),
    cost_per_acquisition FLOAT64 OPTIONS(description="Average cost to acquire a new customer."),
    return_on_investment FLOAT64 OPTIONS(description="Return on Investment calculated as (Revenue - Cost) / Cost.")
) CLUSTER BY campaign_id;
```

##  <font color='#4285f4'>Define Campaign Functions</font>

### GenerateCampaignRecommendations()

In [None]:
# Use tenacity to retry in case of resource exhausted errors (429)
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GenerateCampaignRecommendations(campaign_goal, product_name, product_description, budget):

  # For each row in customer_marketing_profile, send this prompt:
  prompt = f"""
  You are an expert Marketing Analyst, and you are creating a new marketing campaign for a
  French chocolatier and bakery called 'Chocolate AI'.

  Your task is to recommend a campaign for the product that will achieve the campaign goals within budget.

  You should include the following information in your response:
    - A creative campaign name.
    - A detailed description of the campaign.
    - The ideal target audience.
    - Ideal marketing channels. This should be optimized for the target audience.
    - Expected campaign outcomes. This should map directly to the campaign goals.
    - An explanation/justification for the reasoning behind choosing this campaign.

  Your total budget for the campaign is: {budget}

  Here is the goal of the campaign:
  {campaign_goal}

  Here is the product you are promoting:
  {product_name}: {product_description}

  Additional instructions:
  - Results can be creative, but they must be plausible.

  Now generate the campaign recommendation. Think step by step and explain your reasoning.
  """

  # Ref: https://cloud.google.com/vertex-ai/docs/reference/rest/v1/Schema
  response_schema = {}

  response_schema = {
    "type": "object",
    "required": ["campaign_name", "campaign_description", "target_audience", "marketing_channels", "campaign_outcomes", "explanation"],
    "properties": {
      "campaign_name": {
        "type": "string",
        "description": "A descriptive and creative name for the campaign."
      },
      "campaign_description": {
        "type": "string",
        "description": "A detailed description of the campaign."
      },
      "target_audience": {
        "type": "string",
        "description": "The intended audience of the campaign."
      },
      "marketing_channels": {
        "type": "string",
        "description": "The recommended marketing channels to be used in the campaign."
      },
      "campaign_outcomes": {
        "type": "string",
        "description": "The expected outcomes of the campaign."
      },
      "explanation": {
        "type": "string",
        "description": "An explanation for the reasoning behind choose this campaign."
      }
    }
  }

  result = GeminiLLM(prompt, response_schema=response_schema)
  return result


### GetRankedProducts()

In [None]:
def GetRankedProducts(count, time_period='monthly', top_bottom='top'):
  """
  Get ranked products based on sales change over a specified time period.
  Valid time periods are 'yearly', 'monthly', 'weekly', and 'daily'.

  Args:
    count: The number of products to return.
    time_period: The time period for comparison ('yearly', 'monthly', 'weekly', 'daily').
    top_bottom: Whether to return the 'top' or 'bottom' performing products.

  Returns:
    A pandas DataFrame with the ranked products.
  """

  sort_order = 'DESC' if top_bottom == 'top' else 'ASC'

  # Define intervals based on the time_period parameter
  intervals = {
      'yearly': '365 DAY',
      'monthly': '30 DAY',
      'weekly': '7 DAY',
      'daily': '1 DAY'
  }

  interval = intervals.get(time_period)

  sql = f"""WITH current_period_sales AS (
    SELECT
        m.menu_name,
        m.menu_id,
        m.menu_description,
        SUM(oi.item_total) AS cumulative_sales
      FROM
        `${project_id}.${bigquery_chocolate_ai_dataset}.order_item` AS oi
        INNER JOIN `${project_id}.${bigquery_chocolate_ai_dataset}.order` AS o ON o.order_id = oi.order_id
        INNER JOIN `${project_id}.${bigquery_chocolate_ai_dataset}.menu` AS m ON m.menu_id = oi.menu_id
      WHERE o.order_datetime BETWEEN TIMESTAMP(DATE_SUB(CURRENT_DATE(), INTERVAL {interval}))
        AND TIMESTAMP(CURRENT_DATE())
      GROUP BY menu_name, m.menu_id, m.menu_description
  ), prior_period_sales AS (
    SELECT
        m.menu_name,
        m.menu_id,
        m.menu_description,
        SUM(oi.item_total) AS cumulative_sales
      FROM
        `${project_id}.${bigquery_chocolate_ai_dataset}.order_item` AS oi
        INNER JOIN `${project_id}.${bigquery_chocolate_ai_dataset}.order` AS o ON o.order_id = oi.order_id
        INNER JOIN `${project_id}.${bigquery_chocolate_ai_dataset}.menu` AS m ON m.menu_id = oi.menu_id
      WHERE o.order_datetime BETWEEN TIMESTAMP(DATE_SUB(CURRENT_DATE(), INTERVAL 2 * {interval}))
          AND TIMESTAMP(DATE_SUB(CURRENT_DATE(), INTERVAL {interval}))
      GROUP BY menu_name, m.menu_id, m.menu_description
  ) SELECT current_period_sales.menu_id,
    current_period_sales.menu_name,
    current_period_sales.menu_description,
    FORMAT('%s €', CAST(ROUND(current_period_sales.cumulative_sales, 0) as STRING)) AS current_period_cumulative_sales,
    FORMAT('%s €', CAST(ROUND(prior_period_sales.cumulative_sales, 0) as STRING)) AS prior_period_cumulative_sales,
    FORMAT('%s €', CAST(ROUND(current_period_sales.cumulative_sales - prior_period_sales.cumulative_sales, 0) as STRING))  AS change,
    ROUND((current_period_sales.cumulative_sales - prior_period_sales.cumulative_sales) / prior_period_sales.cumulative_sales * 100,0) AS change_percent
  FROM current_period_sales
  JOIN prior_period_sales ON current_period_sales.menu_name = prior_period_sales.menu_name
  ORDER BY change_percent {sort_order}
  LIMIT {count};
  """

  return RunQuery(sql)


### GetFinalProductRanking()

In [None]:
def GetFinalProductRanking(daily_df, weekly_df, monthly_df):
    """
    Ranks products based on appearances and positions in three DataFrames.

    Args:
      daily_df: DataFrame with daily rankings.
      weekly_df: DataFrame with weekly rankings.
      monthly_df: DataFrame with monthly rankings.

    Returns:
      A pandas DataFrame with the final product ranking.
    """

    # 1. Assign weights based on time period
    daily_df['weight'] = 1
    weekly_df['weight'] = 3
    monthly_df['weight'] = 10

    # 2. Concatenate DataFrames
    combined_df = pd.concat([daily_df, weekly_df, monthly_df])

    # 3. Calculate weighted rank
    combined_df['rank'] = combined_df.groupby('menu_name')['change_percent'].rank(ascending=True)
    combined_df['weighted_rank'] = combined_df['rank'] * combined_df['weight']

    # 4. Aggregate weighted rank and count appearances
    final_ranking = combined_df.groupby('menu_name').agg(
        final_weighted_rank=('weighted_rank', 'sum'),
        appearances=('menu_name', 'count')
    ).reset_index()

    # 5. Sort by weighted rank and appearances
    final_ranking = final_ranking.sort_values(
        by=['appearances', 'final_weighted_rank'], ascending=[False, False]
    )

    return final_ranking

### VisualizePreliminaryProductRankings()

In [None]:
def VisualizePreliminaryProductRankings(daily_df, weekly_df, monthly_df):
    """
    Visualizes the product rankings from three DataFrames (daily, weekly, monthly).

    Args:
      daily_df: DataFrame with daily rankings.
      weekly_df: DataFrame with weekly rankings.
      monthly_df: DataFrame with monthly rankings.
    """

    fig, axes = plt.subplots(3, 1, figsize=(10, 12))  # 3 subplots for daily, weekly, monthly

    # Helper function to plot on each subplot
    def plot_ranking(ax, df, time_period):
        ax.barh(df['menu_name'], df['change_percent'], color='skyblue')
        ax.set_xlabel('Percent Change in Sales')
        ax.set_title(f'Top 10 Lowest Performing Products ({time_period})')
        ax.invert_yaxis()  # Invert y-axis for better readability

    plot_ranking(axes[0], daily_df, 'Daily')
    plot_ranking(axes[1], weekly_df, 'Weekly')
    plot_ranking(axes[2], monthly_df, 'Monthly')

    plt.tight_layout()
    plt.show()

### VisualizeFinalProductRankings()

In [None]:
def VisualizeFinalProductRankings(ranking_df, title):
    """
    Visualizes the product rankings from a DataFrame, including appearances.

    Args:
      ranking_df: DataFrame with product rankings.
      title: Title of the plot.

    Returns:
      An image object suitable for Gemini interpretation.
    """

    fig, ax1 = plt.subplots(figsize=(10, 6))

    # Bar chart for weighted rank
    ax1.barh(ranking_df['menu_name'], ranking_df['final_weighted_rank'], color='skyblue', label='Weighted Rank')
    ax1.set_xlabel('Final Weighted Rank')
    ax1.set_title(title)
    ax1.invert_yaxis()

    # Add appearances as text annotations
    for i, (index, row) in enumerate(ranking_df.iterrows()):
        ax1.text(row['final_weighted_rank'], i, f" ({row['appearances']})",
                 va='center', color='black')

    # Legend
    ax1.legend(loc='lower right')

    plt.tight_layout()

    # Save the plot to a BytesIO object
    image_data = BytesIO()
    plt.savefig(image_data, format='png')
    image_data.seek(0)  # Reset the stream position

    plt.show()
    plt.close(fig)  # Close the figure to free up resources

    return image_data

### VisualizeSimulatedCampaignPerformance()

In [None]:
def VisualizeSimulatedCampaignPerformance(campaign_results):
    """
    Visualizes campaign performance metrics for multiple simulations.

    Args:
      campaign_results: A list of lists, where each inner list contains campaign performance data
                         for a single simulation.
    """

    num_simulations = len(campaign_results)
    num_metrics = len(campaign_results[0][0]) - 1  # Exclude campaign_id

    # Extract metric names (assuming they are consistent across simulations)
    metric_names = [
        "Impressions", "Reach", "Website Traffic", "Leads Generated", "Conversions",
        "Cost per Click", "Cost per Acquisition", "Return on Investment"
    ]

    # Set up figure and axes
    fig, axes = plt.subplots(nrows=num_metrics, ncols=1, figsize=(10, 6 * num_metrics))
    fig.suptitle('Campaign Performance Metrics Across Simulations', fontsize=16)

    # Iterate through metrics
    for i in range(num_metrics):
        ax = axes[i]
        max_campaigns = max(len(sim_result) for sim_result in campaign_results)
        for j, simulation_results in enumerate(campaign_results):
            metric_values = [result[i + 1] for result in simulation_results]  # Corrected line
            x_pos = np.arange(len(simulation_results)) + (j - num_simulations / 2) * 0.2  # Adjust bar positions
            ax.bar(x_pos, metric_values, width=0.2, label=f"Simulation {j+1}")

        # Calculate and plot the average line
        avg_metric_values = np.mean([result[i + 1] for sim_result in campaign_results for result in sim_result])
        ax.axhline(y=avg_metric_values, color='black', linestyle='--', label="Average")

        ax.set_xticks(np.arange(max_campaigns))
        ax.set_xticklabels([f"Campaign {i+1}" for i in range(max_campaigns)])
        ax.set_ylabel(metric_names[i])
        ax.legend()

    plt.tight_layout()
    plt.show()

### VisualizeSingleCampaignPerformance()

In [None]:
def VisualizeSingleCampaignPerformance(campaign_data):
    """
    Visualizes performance metrics for a single campaign.

    Args:
      campaign_data: A tuple containing campaign performance data.
    """

    # Extract data from the tuple
    campaign_id = campaign_data[0]
    impressions = campaign_data[1]
    reach = campaign_data[2]
    website_traffic = campaign_data[3]
    leads_generated = campaign_data[4]
    conversions = campaign_data[5]
    cost_per_click = campaign_data[6]
    cost_per_acquisition = campaign_data[7]
    return_on_investment = campaign_data[8]

    # Metric names corresponding to the data
    metric_names = [
        "Impressions", "Reach", "Website Traffic", "Leads Generated", "Conversions",
        "Cost per Click", "Cost per Acquisition", "Return on Investment"
    ]

    # Metric values
    metric_values = [
        impressions, reach, website_traffic, leads_generated, conversions,
        cost_per_click, cost_per_acquisition, return_on_investment
    ]

    # Create the bar chart
    plt.figure(figsize=(10, 6))
    bars = plt.bar(metric_names, metric_values, color='skyblue')  # Store the bar objects
    plt.title(f"Campaign Performance")
    plt.xlabel("Metrics")
    plt.ylabel("Value")
    plt.xticks(rotation=45)
    plt.yscale('log') # logarithmic scale to avoid losing small numbers in graph

    # Add labels above each bar
    for bar in bars:
        height = bar.get_height()
        plt.annotate(f'{height}',
                     xy=(bar.get_x() + bar.get_width() / 2, height),
                     xytext=(0, 3),  # 3 points vertical offset
                     textcoords="offset points",
                     ha='center', va='bottom')


    plt.tight_layout()
    plt.show()

### GenerateCampaignPerformance()

In [None]:
def GenerateCampaignPerformance(budget):
  """
  Generates randomized campaign performance data, which is influence by budget.

  Args:
    campaign_id: The ID of the campaign.
    budget: The budget for the campaign.

  Returns:
    A dictionary containing the performance data.

  Example:
    budget = 5000
    performance_data = GenerateCampaignPerformance(budget)
    https://colab-embedded.cloud.google.com/embedded/projects/data-beans-demo-s5sc5wm836/locations/us-central1/repositories/DB-GMA-Create-Campaign-Recommendations?cde=1&embedding_app=bigquery&authuser=0#scrollTo=GenerateCampaignPerformance_mance_data)
  """

  # Generate random scale factors within a range for non-linear relationship
  impressions_factor = random.uniform(0.6, 0.8)
  reach_factor = random.uniform(0.5, 0.7)
  traffic_factor = random.uniform(0.4, 0.6)
  leads_factor = random.uniform(0.3, 0.5)
  conversions_factor = random.uniform(0.2, 0.4)

  # Generate data points with randomness and scaling
  impressions = int(random.uniform(0.8, 1.2) * (budget ** impressions_factor) * 100)
  reach = int(random.uniform(0.7, 1.1) * (budget ** reach_factor) * 100)
  website_traffic = int(random.uniform(0.6, 1.0) * (budget ** traffic_factor) * 10)
  leads_generated = int(random.uniform(0.5, 0.9) * (budget ** leads_factor) * 5)
  conversions = int(random.uniform(0.4, 0.8) * (budget ** conversions_factor) * 2)


  # Cost per click and acquisition (with some randomness)
  cost_per_click = round(random.uniform(0.2, 1.5), 2)
  cost_per_acquisition = round(random.uniform(5, 50) * (15000 / budget), 2)

  # ROI (mostly positive, but some negative)
  roi_sign = random.choices([-1, 1], weights=[0.2, 0.8])[0]  # 20% chance of negative ROI
  return_on_investment = round(roi_sign * random.uniform(0.5, 10.0), 2)

  return {
      "impressions": impressions,
      "reach": reach,
      "website_traffic": website_traffic,
      "leads_generated": leads_generated,
      "conversions": conversions,
      "cost_per_click": cost_per_click,
      "cost_per_acquisition": cost_per_acquisition,
      "return_on_investment": return_on_investment
  }

## <font color='#4285f4'>Generate Campaign Recommendations</font>

### Step 1: Get Ranked Listed of Under-performing Products

In [None]:
# Get a list of the 10 worst-performing products over the last day, week, and month
product_ranking_daily = GetRankedProducts(10,'daily','bottom')
product_ranking_weekly = GetRankedProducts(10,'weekly','bottom')
product_ranking_monthly = GetRankedProducts(10,'monthly','bottom')


In [None]:
# Visualize the results
VisualizePreliminaryProductRankings(product_ranking_daily, product_ranking_weekly, product_ranking_monthly)

In [None]:
# Combine the 3 data frames into a final ranked data frame
# Products that appear multiple times are ranked higher
# Products that appear in longer time horizons rankings are also ranked higher
final_ranking = GetFinalProductRanking(product_ranking_daily, product_ranking_weekly, product_ranking_monthly)

final_ranking.head(10)

In [None]:
# Visualize the final rankings
final_ranking_img = VisualizeFinalProductRankings(final_ranking, 'Final Product Ranking (Daily, Weekly, Monthly)')

# Upload image to GCS
now = datetime.now()
timestamp_str = now.strftime("%Y%m%d_%H%M%S")
image_name = f"images/product_ranking_{timestamp_str}.png"
image_uri = UploadImageToGcs(final_ranking_img, bucket_name, image_name)

# Have Gemini review the graph data and recommend a product to promote.
preliminary_ranking_function = inspect.getsource(GetRankedProducts)
final_ranking_function = inspect.getsource(GetFinalProductRanking)

text_prompt = f"""You are a marketing analyst, and you are reviewing product
sales to determine which low-performing products you should promote with a
marketing campaign. The graph supplied with this prompt shows the worst-performing
products over the last day, week, and month. It was created by running the
GetRankedProducts() function three times: once each for daily, weekly, monthly
time periods. Then the results of those three GetRankedProducts() objects were
re-ranked by the GetFinalProductRanking() function.

Here is the definition of GetRankedProducts():
```
{preliminary_ranking_function}
```

And here is the definition of GetFinalProductRanking():
```
{final_ranking_function}
```

Your task is to interpret the supplied graph and explain it to a business
stakeholder. Then select 2 products you recommend promoting based on the data,
and explain your reasoning."""

response = GeminiLLM_Multimodal(text_prompt, image_uri)

# Display the results
response_text = response['candidates'][0]['content']['parts'][0]['text']
DisplayMarkdown(response_text)

### Step 2: Determine Discount Amount

Let's say that we decide we should promote "Earl Grey & Bergamot Chocolate Symphony	", and we want to increase sales by 30% in the next week. We can model the impact of offering different levels discounts using [TimesFM](https://github.com/google-research/timesfm) to help us find the right offer to achieve our goals.

In [None]:
# Define product to promote and desired sales increase during promotion period
product_to_promote = "Earl Grey & Bergamot Chocolate Symphony"  # Product name
target_sales_increase = 30  # 30% increase in average daily sales
max_promotion_discount_percent = 50  # 50% - The maximum discount you're willing to offer during the promotion

# Get sales of the product for the last 42 days (6 weeks)
sql = f"""SELECT
    m.menu_name,
    SUM(oi.item_total) AS sales,
    TIMESTAMP_TRUNC(o.order_datetime, DAY) AS day
  FROM
    `${project_id}.${bigquery_chocolate_ai_dataset}.order_item` AS oi
    INNER JOIN `${project_id}.${bigquery_chocolate_ai_dataset}.order` AS o ON o.order_id = oi.order_id
    INNER JOIN `${project_id}.${bigquery_chocolate_ai_dataset}.menu` AS m ON m.menu_id = oi.menu_id
  WHERE m.menu_name = "{product_to_promote}"
   AND o.order_datetime BETWEEN TIMESTAMP(DATE_SUB(CURRENT_DATE(), INTERVAL 42 DAY)) AND CURRENT_TIMESTAMP()
  GROUP BY 1, 3
  ORDER BY 3
  LIMIT 42;"""

daily_sales = RunQuery(sql)
daily_sales.head()
#daily_sales.describe()

In [None]:
# Configure TimesFM

context_len = 512
horizon_len = 7 # Predict next 7 days, this could be 128 without requiring compute (129 would be a step up).  This is more of the max horizon len.
input_patch_len = 32
output_patch_len = 128
num_layers = 20
model_dims = 1280
timesfm_backend = "cpu" # cpu, gpu or cuda
xreg_mode = "xreg + timesfm"

In [None]:
# Define function to forecast promotion sales
def ForecastPromotionSales(inputs, current_promotion_discount_percent, past_promotion_discount_percent_assumption=40):

  # IMPORTANT:
  # The covariates MUST cover both the context data PLUS the horizon length.
  # For example, inputs has a length of 42, so covariate arrays must be of length 49.

  # Calculate stats for input sales
  median = statistics.median(daily_sales['sales'])
  standard_deviation = statistics.stdev(daily_sales['sales'])

  # Generate discount array - for demo purposes, we're inferring that we had a
  # promotion running on days that we had higher than median + stdev sales
  discount_array = [0 if x < median + standard_deviation else past_promotion_discount_percent_assumption for x in inputs]

  # Add 7 days of current_promotion_discount_percent to discount_array to cover horizon_len
  [discount_array.append(current_promotion_discount_percent) for x in range(7)]

  # Define day of week array with 7 weeks of data to cover inputs + horizon_len
  day_of_week_array = [str(x + 1) for x in range(7)]*7 #

  # These are our categorical covariates (additional factors that we think might influence the thing we're trying to predict).
  # Here we consider the day of the week and if a marketing campaign was in progress
  dynamic_categorical_covariates = {
      "day_of_week": day_of_week_array
  }

  # These are our numerical covariates (additional numeric factors, just like the categories, but numbers)
  # Here we consider the temperature of the day
  dynamic_numerical_covariates = {
      "discount_percent": discount_array
  }

  # These are our static covariates (additional factors that we think are fixed, like the price of the product)
  # Here we consider the price of the product
  static_numerical_covariates = {
      "price": 18.99
  }

  # These are our static categorical covariates (additional factors that we think are fixed, like the menu item)
  # Here we consider the menu item
  static_categorical_covariates = {
      "menu_item" : "chocolate_tasting_flight"
  }


  # frequency of each context time series. 0 for high frequency (default), 1 for medium, and 2 for low.
  frequency = 0

  # Build JSON payload
  payload = {
    "instances": [
      {
          "input": inputs,
          "freq": frequency,
          "horizon": horizon_len,
          "dynamic_numerical_covariates": dynamic_numerical_covariates,
          "dynamic_categorical_covariates": dynamic_categorical_covariates,
          "static_numerical_covariates": static_numerical_covariates,
          "static_categorical_covariates": static_categorical_covariates,
          "xreg_kwargs": {
              "xreg_mode" : xreg_mode
          }
      }
    ]
  }

  # Get the project number in order to call the endpoint
  project_number = getProjectNumber(project_id)

  # Calls TimeFM to make a prediction
  times_fm_inference = timesFMInference(project_number, endpoint_id, payload)

   # Create an array of forecasted elements
  model_forecast = [times_fm_inference["predictions"][0]["point_forecast"]]

  return model_forecast[0]

In [None]:
# Run forecasts based on our sales data and desired sales increase
inputs = [int(round(x, 0)) for x in daily_sales['sales']]
average_sales = statistics.mean(inputs)
target_sales = average_sales * (1 + (target_sales_increase/100))
recommended_discount_percent = 0

print(f"Current Average daily sales:     {round(average_sales, 2)}")
print(f"Target Average daily sales:      {round(target_sales,2)} ({target_sales_increase}% increase)")
print(f"Maximum promotion discount:      {max_promotion_discount_percent}%\n")

print("Forecasting ideal discount to achieve your sales target...")
for discount in range(max_promotion_discount_percent):
  result = ForecastPromotionSales(inputs, discount)
  if statistics.mean(result) >= target_sales:
    print(f"A promotion discount of {discount}% is projected to achieve your sales target.")
    print(f"\nSetting recommended discount to {discount}%.")
    recommended_discount_percent = discount
    break
  else:
    print(f"A promotion discount of {discount}% is NOT likely to achieve your sales target.")

### Step 3: Generate Campaigns Ideas

Now that we've decided which product to promote and the target discount amount, let's generate some campaign ideas with Gemini.

In [None]:
# Get product details for the product(s) we will be promoting
sql = f"""SELECT * FROM `${project_id}.${bigquery_chocolate_ai_dataset}.menu`
WHERE menu_name = '{product_to_promote}'
"""

product_details = RunQuery(sql)
product_details = product_details.iloc[0]

product_name = product_details['menu_name']
product_description = product_details['menu_description']
menu_id = product_details['menu_id']

Run the cell below as many times as you'd like until you're happy with the recommeded campaign.

In [None]:
# Define goal and budget
campaign_goal = f"Increase sales of {product_to_promote} by {target_sales_increase}%."
budget = 10000

# Generate campaign recommendations for low-performing products
result = GenerateCampaignRecommendations(campaign_goal, product_name, product_description, budget)
#PrettyPrintJson(result)

json_result = json.loads(result)
result_string = f"""
##Suggested campaign for {product_to_promote}

**Campaign Name:** {json_result["campaign_name"]}

**Campaign Description:** {json_result["campaign_description"]}

**Target Audience:** {json_result["target_audience"]}

**Predicted Campaign Outcomes:**
{json_result["campaign_outcomes"]}

**Reasoning for Campaign Choice:**
{json_result["explanation"]}

**Suggested Marketing Channels:**
{json_result["marketing_channels"]}
"""

DisplayMarkdown(result_string)

Run the cell below to save the campaign to the database.

In [None]:
# Set start and end dates for camapaign to activate it
campaign_start_date = '2024-09-30'
campaign_end_date = '2024-10-06'

# Save campaign to database
# Using json.dumps() to avoid unexpected quoting errors on insert
campaign_name = json.dumps(json_result["campaign_name"])
campaign_description = json.dumps(json_result["campaign_description"])
campaign_outcomes = json.dumps(json_result["campaign_outcomes"])
target_audience = json.dumps(json_result["target_audience"])
marketing_channels = json.dumps(json_result["marketing_channels"])
explanation = json.dumps(json_result["explanation"])
campaign_goal = json.dumps(campaign_goal)
budget = budget

sql=f"""
INSERT INTO `chocolate-ai-demo-b2kvrbnkb3.chocolate_ai.campaign`
  (campaign_id, campaign_name, campaign_description, campaign_goal, target_audience, marketing_channels, budget, explanation, campaign_outcomes, menu_id, campaign_start_date, campaign_end_date, campaign_created_date)
VALUES
  (GENERATE_UUID(), {campaign_name}, {campaign_description}, {campaign_goal}, {target_audience}, {marketing_channels}, {budget}, {explanation}, {campaign_outcomes}, {menu_id}, '{campaign_start_date}', '{campaign_end_date}', CAST(CURRENT_DATETIME() AS STRING));
"""

RunQuery(sql)


### Step 4: Simulate Campaign Performance

In [None]:
# This cell uses GenerateCampaignPerformance() to simulate the possible outcomes of this campaign.
sql = f"""SELECT * FROM `${project_id}.${bigquery_chocolate_ai_dataset}.campaign`
WHERE campaign_name = {campaign_name}
ORDER BY campaign_created_date DESC;
"""

campaign_table = RunQuery(sql)
campaign_details = campaign_table.iloc[0]
campaign_id = campaign_details['campaign_id']
campaign_budget = campaign_details['budget']
campaign_details

simulation_count = 10

sim_campaign_performance = []
for sim in range(simulation_count):
  sim_campaign_performance.append([(row.campaign_id, *[v for v in GenerateCampaignPerformance(campaign_budget).values()]) for row in campaign_table.itertuples()])

VisualizeSimulatedCampaignPerformance(sim_campaign_performance)

### Step 5: Run campaign and view results

In [None]:
# Run one final simulation to mock an actual outcome, and save it to the database
campaign_performance = []
campaign_performance.append([(row.campaign_id, *[v for v in GenerateCampaignPerformance(campaign_budget).values()]) for row in campaign_table.itertuples()])

# Unnest results
campaign_performance = campaign_performance[0][0]

# Show campaign performance
VisualizeSingleCampaignPerformance(campaign_performance)

# Save to database
sql=f"""INSERT INTO `${project_id}.${bigquery_chocolate_ai_dataset}.campaign_performance` (
    campaign_id, impressions, reach, website_traffic, leads_generated, conversions, cost_per_click, cost_per_acquisition, return_on_investment)
    VALUES {str(campaign_performance)}
"""

result = RunQuery(sql)


## <font color='#4285f4'>Reference Links</font>


- [TimesFM: a pre-trained time-series foundation model developed by Google Research for time-series forecasting](https://github.com/google-research/timesfm)
- [Generate content with the Gemini Enterprise API](https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference)
- [Controlled Generation with Gemini](https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/control-generated-output)