## <font color='#4285f4'>Overview</font>

Spanner’s new Graph capabilities enable us to create a graph representation of our customer reviews and order history, as well as a social graph of our social media followers, empowering us to derive new analytical insights from complex relationships via efficient graph node traversal. This notebook also demonstrates simple data movement from BigQuery to Spanner via Reverse ETL, joining real-time ordering data with historical data via External Datasets, and performing vector similarity search with Gemini explanations.

Process Flow: 
1. Data Preparation:
    - Spanner Setup: Creates a Spanner instance and database, defining node tables (e.g., customers, orders) and edge tables (e.g., customer_reviews_menu_item) based on schemas from BigQuery. A property graph encompassing these tables is defined.
    - Data Loading: Data from BigQuery is transferred to Spanner using Reverse ETL, and social graph data is generated. 
2. Recommendation and Analysis:
    - Collaborative Filtering: Leverages the graph to find personalized product recommendations, considering preferences of similar customers, even when they dislike the same items.
    - Brand Partner Discovery: Identifies influential customers followed by "at-risk" customers for potential marketing partnerships.
3. Social Network Visualization: Visually represents the social graph to understand customer connections.
4. Spanner & BigQuery Integration:
    - External Dataset: Allows querying Spanner data directly from BigQuery using an external schema. Demonstrates analyzing order trends across both real-time data in Spanner and historical data in BigQuery.
5. AI Enhancements:
    - Vertex AI integration: Creates remote AI models in Spanner for generating text embeddings and using a large language model (LLM).
    - Vector Search: Demonstrates finding customers similar to a search phrase using both exact nearest neighbor (ENN) and approximate nearest neighbor (ANN) searches.
6. Gemini Explanations: Utilizes the LLM to provide insights into the relevance of vector search results.

Cost:
* The Spanner instance configured with 100 processing units costs ~$3.00 a day. 
* If experimenting with ANN vector search, Spanner requires 1000 processing units (1 node) minimum. Be sure to use the scale up and scale down functions provided in the notebook to optimize cost. 

Author:
* Paul Ramsey

## <font color='#4285f4'>Video Overview</font>

[![Video](https://img.youtube.com/vi/SepF_1T133k/hq720.jpg)](https://www.youtube.com/watch?v=SepF_1T133k)

## <font color='#4285f4'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

## <font color='#4285f4'>Initialize</font>

### Pip installs

In [None]:
! pip -q install pyvis

### Imports

In [None]:
from IPython.display import HTML
import google.auth
import random
import time
from datetime import datetime, timedelta, timezone
import pandas as pd
import re
from pyvis.network import Network
import networkx as nx

from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception
from google.cloud import bigquery
client = bigquery.Client()

### Variables

In [None]:
# Update these variables to match your environment
location="us-central1" # Your region
bigquery_location = "${bigquery_location}" # Must be "us" or "eu"

### Do not change the values in this cell below this line ###
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) == 0:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

project_number = !gcloud projects describe {project_id} --format="value(projectNumber)"
project_number = str(project_number[0])
bucket_name = f"bucket-{project_id}"

# Define Spanner and BQ variables
instance_id = f"chocolate-ai-{project_id}"
database_id = "chocolate-ai"
bq_dataset = "${project_id}.${bigquery_chocolate_ai_dataset}"
session = None
spanner_external_schema_name = 'chocolate_ai_spanner_external_schema'

print(f"project_id = {project_id}")
print(f"user = {user}")
print(f"location = {location}")
print(f"bigquery_location = {bigquery_location}")
print(f"bucket_name = {bucket_name}")
print(f"project_number = {project_number}")
print(f"instance_id = {instance_id}")
print(f"database_id = {database_id}")
print(f"bq_dataset = {bq_dataset}")

### Define Helper Methods

#### RetryCondition(error)

In [None]:
def RetryCondition(error):
  error_string = str(error)
  print(error_string)

  retry_errors = [
      "RESOURCE_EXHAUSTED",
      "No content in candidate",
      # Add more error messages here as needed
  ]

  for retry_error in retry_errors:
    if retry_error in error_string:
      print("Retrying...")
      return True

  return False

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str = {}, params: dict = None) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  # Get user information from credentials
  user_info = getattr(creds, 'id_token', None)
  if user_info:
      #print(f"Request made by user: {user_info['email']}")
      pass
  else:
      # If no user info, it's likely a service account
      #print(f"Request made by service account: {creds.service_account_email}")
      pass


  headers = {
    "Authorization" : "Bearer " + access_token,
    "Content-Type": "application/json",
    "x-goog-user-project": project_id # Required to workaround quota project bug
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers, params=params)
  elif http_verb == "POST":
    response = requests.post(url, headers=headers, data=json.dumps(request_body), params=params)
  elif http_verb == "PUT":
    response = requests.put(url, headers=headers, data=json.dumps(request_body), params=params)
  elif http_verb == "PATCH":
    response = requests.patch(url, headers=headers, data=json.dumps(request_body), params=params)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers, params=params)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
  else:
    print("Request URL:", response.request.url)
    print("Request Headers:", response.request.headers)
    print("Request Body:", response.request.body)
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### RunBQQuery()

In [None]:
def RunBQQuery(sql, job_config = None):
  import time

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    if job_config == None:
      job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      return False

#### GetSpannerSessions()

In [None]:
def GetSpannerSessions(project_id = project_id, instance_id = instance_id, database_id = "chocolate-ai"):

  # Get session
  uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/{instance_id}/databases/{database_id}/sessions"
  response = restAPIHelper(uri, "GET")
  return response

#### CreateSpannerSession()

In [None]:
# https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.sessions/create
def CreateSpannerSession(project_id = project_id, instance_id = instance_id, database_id = "chocolate-ai"):
  print("No Spanner session found. Creating a new session.")

  # Create a new session
  uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/{instance_id}/databases/{database_id}/sessions"
  params = {
      "database": f"projects/{project_id}/instances/{instance_id}/databases/{database_id}"
  }
  response = restAPIHelper(uri, "POST", {}, params)
  return response['name']

#### CloseSpannerSession()

In [None]:
# https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.sessions/delete
def CloseSpannerSession(session, project_id = project_id, instance_id = instance_id, database_id = "chocolate-ai"):
  """
  Example:
    response = GetSpannerSessions()
    for session in response['sessions']:
      CloseSpannerSession(session['name'])
  """

  uri = f"https://spanner.googleapis.com/v1/{session}"
  response = restAPIHelper(uri, "DELETE", {}, {"name": f"{session}"})
  return response

#### RunSpannerQuery()

In [None]:
def RunSpannerQuery(sql, database_id = "chocolate-ai", query_options=None, create_new_session=False):
  """
  Runs a Spanner query and returns the result.

  Args:
      sql: The SQL query to execute.
      query_options: (Optional) A dictionary of advanced query options.
                    See https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.sessions/executeSql#queryoptions
                    for available options.

  Returns:
      A dictionary containing the query results.

  Ref:
      https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.sessions/executeSql
  """
  # Ensure a session exists
  # Create session
  global session
  if not session or create_new_session == True:
    session = CreateSpannerSession()

  # Initialize response vars
  commit_response = ""
  response = ""

  # Construct the request URL
  uri = f"https://spanner.googleapis.com/v1/{session}:executeSql"

  # Set transaction type (readOnly/readWrite) and transaction object with commit type (begin/singleUse)
  transaction_type = "readWrite" if any(x in sql.lower() for x in ["insert", "update", "delete"]) else "readOnly"
  transaction = {"begin": {"readWrite": {}}} if transaction_type == "readWrite" else {"singleUse": {"readOnly": {}}}

  request_body = {
      "sql": sql,
      "transaction": transaction
  }
  params = {
      "session": session
  }

  if query_options:
      request_body["queryOptions"] = query_options

  try:
    # Make the request
    response = restAPIHelper(uri, "POST", request_body=request_body, params = params)

  except RuntimeError as e:
    if "Session not found" in str(e):
      print(f"Session not found. Creating a new session and retrying the query...")
      return RunSpannerQuery(sql, database_id, query_options, create_new_session=True)  # Retry with a new session
    else:
      raise  # Re-raise the exception if it's not a "Session not found" error

  # Commit transaction if read/write
  if transaction_type == "readWrite":
      uri = f"https://spanner.googleapis.com/v1/{session}:commit"
      params = {
          "session": session
      }
      commit_response = restAPIHelper(uri, "POST", {"transactionId": response['metadata']['transaction']['id']}, params)
      print(f"commit_response: {commit_response}")

  # Return a DataFrame if
  if (sql.lower().startswith(("select", "with", "graph"))):
    columns = [field.get('name', 'unnamed_column') for field in response['metadata']['rowType']['fields']]

    # Create DataFrame from rows
    if 'rows' in response:
      df = pd.DataFrame(response['rows'], columns=columns)
      return df
    else:
      return response

  else:
    # Return the query results
    return response

#### RunSpannerDDL()

In [None]:
def RunSpannerDDL(ddl_array, project_id = project_id, instance_id = instance_id, database_id = database_id):
  # Create tables in Spanner
  # https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases.tables/create#try-it

  uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/{instance_id}/databases/{database_id}/ddl"
  http_verb = "PATCH"
  request_body = {
      "statements": ddl_array
  }

  response = restAPIHelper(uri, http_verb, request_body)

  operation_name = response['name']
  uri = f"https://spanner.googleapis.com/v1/{operation_name}"

  while True:
    response = restAPIHelper(uri, "GET", {})
    if response.get("done", False):
      if response.get("error"):
        print(response.get("error"))
      else:
        print("Operation completed successfully.")
      break
    else:
      print("Operation not completed yet.")
      time.sleep(2)

#### ScaleSpannerInstance()


In [None]:
def ScaleSpannerInstance(processing_units, instance_id = instance_id, project_id = project_id):
  uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/{instance_id}"
  http_verb = "PATCH"
  request_body = {
      "instance": {
          "config": f"projects/{project_id}/instanceConfigs/regional-us-central1",
          "processingUnits": processing_units
      },
      "fieldMask": "processingUnits"
  }

  response = restAPIHelper(uri, http_verb, request_body)
  print(response)

  operation_name = response['name']
  uri = f"https://spanner.googleapis.com/v1/{operation_name}"

  while True:
    response = restAPIHelper(uri, "GET", {})
    if response.get("done", False):
      if response.get("error"):
        print(response.get("error"))
      else:
        print("Operation completed successfully.")
      break
    else:
      print("Operation not completed yet.")
      time.sleep(2)

#### RunReverseETL()

In [None]:
def RunReverseETL(source_table, project_id = project_id, instance_id = instance_id, database_id = database_id):
  print(f"Running reverse ETL for table {source_table}...")

  spanner_options =  f"{{'table': 'cai_{source_table}'}}"
  spanner_options = spanner_options.replace("'", '"')

  export_statement = f"""EXPORT DATA OPTIONS (
    uri='https://spanner.googleapis.com/projects/{project_id}/instances/{instance_id}/databases/{database_id}',
    format='CLOUD_SPANNER',
    spanner_options='''{spanner_options}'''
  )
  AS SELECT * FROM `{bq_dataset}.{source_table}`;"""

  result = RunBQQuery(export_statement)
  return result

#### GenerateFollowerData()

In [None]:
def GenerateFollowerData(user_ids, follower_ids, influencer_ids,
                           min_followers, max_followers,
                           min_influencer_followers, max_influencer_followers,
                           mutation_limit):
  """
  Generates and inserts social media follower data into a database.

  Args:
    user_ids: List of user IDs to generate follower data for.
    follower_ids: List of potential follower IDs.
    influencer_ids: List of potential influencer IDs to follow.
    min_followers: Minimum number of regular followers for each user.
    max_followers: Maximum number of regular followers for each user.
    min_influencer_followers: Minimum number of influencer followers for each user.
    max_influencer_followers: Maximum number of influencer followers for each user.
    mutation_limit: Maximum number of mutations allowed per transaction.
  """
  def get_random_date(start_date, end_date):
    time_between_dates = end_date - start_date
    days_between_dates = time_between_dates.days

    random_number_of_days = random.randrange(days_between_dates)
    random_date = start_date + timedelta(days=random_number_of_days)

    formatted_date = random_date.strftime('%Y-%m-%d')  # Convert to string
    return formatted_date

  for i in range(len(user_ids)):
    print(f"Processing user: ({i + 1} of {len(user_ids)})")

    # Get number of followers and influencer followers
    num_followers = random.randint(min_followers, max_followers)
    num_influencer_followers = random.randint(min_influencer_followers, max_influencer_followers)

    # Get array of random non-influencers who follow users
    random_follower_ids = random.sample(follower_ids, min(num_followers, len(follower_ids)))

    # Get array of random influencers who follow users
    random_follower_ids.extend(random.sample(influencer_ids, min(num_influencer_followers, len(influencer_ids))))

    # Define random date range
    date_range_start = datetime(2020, 1, 1)
    date_range_end = datetime(2024, 10, 4)

    # Track mutations
    mutation_count = 0
    mutations_per_insert = 3

    # Build sql
    sql = f"""INSERT OR UPDATE INTO cai_edge_customer_follows_customer (customer_id, followed_customer_id, follow_date) VALUES """
    for j in range(len(random_follower_ids)):
      random_date = get_random_date(date_range_start, date_range_end)
      sql += f"""({user_ids[i]}, {random_follower_ids[j]}, '{random_date}'),"""
      mutation_count += mutations_per_insert

      # Run the insert if mutation count approaches max mutations per transaction
      if j == len(random_follower_ids) - 1 or mutation_count >= mutation_limit - mutations_per_insert:
        print(f"Mutation count: {mutation_count}")
        result = RunSpannerQuery(sql[:-1], session)
        print(result)
        sql = f"""INSERT OR UPDATE INTO cai_edge_customer_follows_customer (customer_id, followed_customer_id, follow_date) VALUES """
        mutation_count = 0

  print("Finished generating follower data.")

## <font color='#4285f4'>Setup Spanner Graph</font>

### Create Spanner Instance and Database

In [None]:
# Enable the Spanner API
uri = f"https://serviceusage.googleapis.com/v1/projects/{project_number}/services/spanner.googleapis.com:enable"

response = restAPIHelper(uri, "POST", {})

try:
  print(response['response']['service']['state'])
except:
  print(response)

In [None]:
# Create the Spanner instance
# This notebook creates a paid instance. 90-day Free trial available once per project lifecycle
# https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances/create
uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances"
http_verb = "POST"
request_body = {
    "instance": {
        "config": f"projects/{project_id}/instanceConfigs/regional-us-central1",
        "displayName": "Chocolate AI Spanner",
        "edition": "ENTERPRISE",
        "processingUnits": 100,

        # OPTIONAL: Define nodeCount instead of processingUnits or autoscalingConfig.
        #"nodeCount": 1,

        # OPTIONAL: Define autoscalingConfig instead of nodeCount or processingUnits.
        #"autoscalingConfig": {
        #  "autoscalingLimits": {
        #    "minProcessingUnits": 1000,
        #    "maxProcessingUnits": 2000
        #  },
        #  "autoscalingTargets": {
        #    "highPriorityCpuUtilizationPercent": 80,
        #    "storageUtilizationPercent": 80
        #  }
        #}
    },
    "instanceId": f"{instance_id}"
}

response = restAPIHelper(uri, http_verb, request_body)
response

In [None]:
# Create a database
# https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases/create#try-it

uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/{instance_id}/databases"
http_verb = "POST"
request_body = {
    "createStatement": f"CREATE DATABASE `{database_id}`"
}

response = restAPIHelper(uri, http_verb, request_body)
response

### Create AI Models in Spanner

In [None]:
# Create and Embeddings Model and LLM Model
# Ref: https://codelabs.developers.google.com/codelabs/spanner-getting-started-vector-search#3
#      https://cloud.google.com/spanner/docs/ml-tutorial-embeddings
ddl_array = []
ddl_array.append(f"""CREATE MODEL IF NOT EXISTS EmbeddingsModel INPUT(
  content STRING(MAX),
  ) OUTPUT(
  embeddings STRUCT<statistics STRUCT<truncated BOOL, token_count FLOAT64>, values ARRAY<FLOAT64>>,
  ) REMOTE OPTIONS (
  endpoint = '//aiplatform.googleapis.com/projects/{project_id}/locations/us-central1/publishers/google/models/text-embedding-005'
  )
""")

ddl_array.append(f"""CREATE MODEL IF NOT EXISTS LLMModel INPUT(
prompt STRING(MAX),
) OUTPUT(
content STRING(MAX),
) REMOTE OPTIONS (
endpoint = '//aiplatform.googleapis.com/projects/{project_id}/locations/us-central1/publishers/google/models/gemini-2.0-flash',
default_batch_size = 1
)""")

result = RunSpannerDDL(ddl_array)
result

### Create Node Tables

In [None]:
# Get source table definitions from BQ and translate to Spanner syntax

## Get DDL from BQ
sql = f"""SELECT ddl
  FROM `{bq_dataset}.INFORMATION_SCHEMA.TABLES`
  WHERE table_type = 'BASE TABLE'
  AND table_name IN (
    'customer',
    'customer_marketing_profile',
    'customer_review',
    'menu',
    'order',
    'order_item',
    'store'
  );"""
result = RunBQQuery(sql)

# Transform BQ DDL into Spanner dialect.
ddl_array = []
for row in result.itertuples():
  ddl = row.ddl
  if "CLUSTER BY" in ddl:
    ddl = ddl.replace("CLUSTER BY ", "PRIMARY KEY(")
    ddl = ddl.replace(";", ");")

  ddl = ddl.replace(f"{bq_dataset}.",f"cai_")
  ddl = re.sub(r"[\r\n]+", "", ddl)
  ddl = ddl.replace("  ", " ")
  ddl = ddl.replace("( ", "(")
  ddl = ddl.replace(") ", ")")
  ddl = ddl.replace("STRING", "STRING(MAX)")
  ddl = ddl.replace(";", "")
  ddl = ddl.replace("DEFAULT GENERATE_UUID()", "DEFAULT (GENERATE_UUID())")
  ddl = ddl.replace("DEFAULT CURRENT_TIMESTAMP()", "DEFAULT (CURRENT_TIMESTAMP())")
  ddl = ddl.replace("DATETIME", "TIMESTAMP")

  #ddl = ddl.replace("ARRAY<FLOAT64>","ARRAY<FLOAT64>(vector_length=>768)")

  # Replace those with escaped quotes, newlines, and nested parentheses
  ddl = re.sub(r"OPTIONS\s*\((?:[^()]|\([^()]*\))*\)", "", ddl)

  if "PRIMARY KEY" not in ddl:
      # Extract the first column name using a regular expression
      first_column_match = re.search(r"CREATE TABLE `.*`\((\S+)", ddl)
      if first_column_match:
          first_column = first_column_match.group(1)
          print(f"Adding primary key: {first_column}")
          # Add PRIMARY KEY constraint to the first column
          ddl = ddl + f" PRIMARY KEY({first_column})"  # Remove the last ')' and add PK

  ddl = ddl.replace("CREATE TABLE","CREATE TABLE IF NOT EXISTS")
  ddl_array.append(ddl)

  # Print the transformed DDL
  print(ddl)

# Define the customer_360 table
# This is a special case, because customer_360 is a view in BQ with
# incompatible data types, so it's easier to create this as a one off.
ddl_array.append(f"""CREATE TABLE IF NOT EXISTS cai_customer_360 (
  customer_id INT64 NOT NULL,
  customer_marketing_insights STRING(MAX),
  benefits_sought STRING(MAX),
  browsing_behavior STRING(MAX),
  loyalty_status STRING(MAX),
  occasion_timing STRING(MAX),
  purchase_history STRING(MAX),
  spending_habits STRING(MAX),
  usage_frequency STRING(MAX),
  user_status STRING(MAX),
  at_risk_customers STRING(MAX),
  first_time_customers STRING(MAX),
  former_customers STRING(MAX),
  inactive_customers STRING(MAX),
  loyal_advocates STRING(MAX),
  new_leads STRING(MAX),
  potential_customers STRING(MAX),
  repeat_customers STRING(MAX),
  age STRING(MAX),
  education STRING(MAX),
  ethnicity STRING(MAX),
  family_size STRING(MAX),
  gender STRING(MAX),
  generation STRING(MAX),
  income STRING(MAX),
  language STRING(MAX),
  marital_status STRING(MAX),
  occupation STRING(MAX),
  city STRING(MAX),
  climate STRING(MAX),
  country STRING(MAX),
  population_density STRING(MAX),
  region STRING(MAX),
  time_zone STRING(MAX),
  urban_rural STRING(MAX),
  challenges STRING(MAX),
  goals STRING(MAX),
  pain_points STRING(MAX),
  priorities STRING(MAX),
  specific_needs STRING(MAX),
  attitudes STRING(MAX),
  hobbies STRING(MAX),
  interests STRING(MAX),
  lifestyle STRING(MAX),
  motivations STRING(MAX),
  personality STRING(MAX),
  social_class STRING(MAX),
  customer_values STRING(MAX),
  adoption_rate STRING(MAX),
  browsers STRING(MAX),
  devices STRING(MAX),
  internet_connectivity STRING(MAX),
  operating_systems STRING(MAX),
  social_media_platforms STRING(MAX),
  software STRING(MAX),
  tech_savviness STRING(MAX),
  cost_benefit_analysis STRING(MAX),
  perceived_value STRING(MAX),
  price_sensitivity STRING(MAX),
  willingness_to_pay STRING(MAX),
  children STRING(MAX),
  chocolate_preferences STRING(MAX),
  content_interaction STRING(MAX),
  customer_age INT64,
  facebook_bio STRING(MAX),
  facebook_engagement STRING(MAX),
  facebook_handle STRING(MAX),
  instagram_bio STRING(MAX),
  instagram_engagement STRING(MAX),
  instagram_handle STRING(MAX),
  linkedin_bio STRING(MAX),
  linkedin_engagement STRING(MAX),
  linkedin_handle STRING(MAX),
  martial_status STRING(MAX),
  solicated_buying_habits STRING(MAX),
  sports STRING(MAX),
  tiktok_bio STRING(MAX),
  tiktok_handle STRING(MAX),
  twitter_bio STRING(MAX),
  twitter_engagement STRING(MAX),
  twitter_handle STRING(MAX),
  youtube_bio STRING(MAX),
  youtube_handle STRING(MAX),
  customer_service_interactions STRING(MAX),
  average_amount_spent_per_order NUMERIC,
  last_order_date TIMESTAMP,
  latest_review_sentiment STRING(MAX),
  most_frequent_purchase_location INT64,
  negative_review_percentage NUMERIC,
  neutral_review_percentage NUMERIC,
  positive_review_percentage NUMERIC,
  purchase_locations STRING(MAX),
  top_3_favorite_menu_items STRING(MAX),
  total_amount_spent NUMERIC,
  total_orders INT64,
  total_reviews INT64
) PRIMARY KEY(customer_id)
""")

# Create the tables in Spanner
RunSpannerDDL(ddl_array)


### Create Edge Tables

In [None]:
# Add edge table DDL into an array
ddl_array = []

ddl_array.append(f"""CREATE TABLE IF NOT EXISTS cai_edge_customer_places_order
( customer_id INT64,
  order_id INT64,
  store_id INT64,
  order_datetime TIMESTAMP 
) PRIMARY KEY(customer_id, order_id, store_id),
  INTERLEAVE IN PARENT cai_customer ON DELETE CASCADE
""")

ddl_array.append(f"""CREATE TABLE IF NOT EXISTS cai_edge_customer_writes_review
( customer_id INT64,
  customer_review_id INT64,
  review_datetime TIMESTAMP 
) PRIMARY KEY(customer_id, customer_review_id),
  INTERLEAVE IN PARENT cai_customer ON DELETE CASCADE
""")

ddl_array.append(f"""CREATE TABLE IF NOT EXISTS cai_edge_customer_follows_customer
( customer_id INT64,
  followed_customer_id INT64,
  follow_date DATE 
) PRIMARY KEY(customer_id, followed_customer_id),
  INTERLEAVE IN PARENT cai_customer ON DELETE CASCADE
""")

ddl_array.append(f"""CREATE TABLE IF NOT EXISTS cai_edge_customer_rates_menu_item
( customer_id INT64,
  menu_id INT64,
  rating INT64,
  rating_datetime TIMESTAMP 
) PRIMARY KEY(customer_id, menu_id),
  INTERLEAVE IN PARENT cai_customer ON DELETE CASCADE
""")

ddl_array.append(f"""CREATE TABLE IF NOT EXISTS cai_edge_order_contains_order_item
( order_id INT64,
  store_id INT64,
  order_item_id INT64 
) PRIMARY KEY(order_id, store_id, order_item_id),
  INTERLEAVE IN PARENT cai_order ON DELETE CASCADE
""")

ddl_array.append(f"""CREATE TABLE IF NOT EXISTS cai_edge_order_placed_at_store
( order_id INT64,
  store_id INT64,
  order_datetime TIMESTAMP 
) PRIMARY KEY(order_id, store_id),
  INTERLEAVE IN PARENT cai_order ON DELETE CASCADE
""")

ddl_array.append(f"""CREATE TABLE cai_edge_customer_has_marketing_profile (
  customer_id INT64,
  marketing_profile_id INT64,
) PRIMARY KEY(customer_id, marketing_profile_id),
  INTERLEAVE IN PARENT cai_customer ON DELETE CASCADE""")

RunSpannerDDL(ddl_array)

### <font color='#4285f4'>Add Full-text Search Index</font>

In [None]:
ddl_array = []

# Add FTS token column
ddl_array.append("""
ALTER TABLE cai_menu
ADD COLUMN menu_name_token TOKENLIST
AS (TOKENIZE_NGRAMS(menu_name))
STORED HIDDEN
""")

# Create search index on tokenized column
ddl_array.append("""
CREATE SEARCH INDEX menu_name_fts_idx
ON cai_menu(menu_name_token)
OPTIONS (sort_order_sharding = true)
""")

result = RunSpannerDDL(ddl_array)

### Create Spanner Graph

In [None]:
# Create graph
ddl_array = []

ddl_array.append(f"""CREATE OR REPLACE PROPERTY GRAPH chocolate_ai_graph
  NODE TABLES (
    cai_customer,
    cai_customer_marketing_profile,
    cai_customer_review,
    cai_menu,
    cai_order,
    cai_order_item,
    cai_store,
    cai_customer_360
  )
  EDGE TABLES (
    cai_edge_customer_places_order
      SOURCE KEY(customer_id) REFERENCES cai_customer
      DESTINATION KEY(order_id, store_id) REFERENCES cai_order
      LABEL Places,
    cai_edge_customer_writes_review
      SOURCE KEY(customer_id) REFERENCES cai_customer
      DESTINATION KEY(customer_review_id) REFERENCES cai_customer_review
      LABEL Writes,
    cai_edge_customer_follows_customer
      SOURCE KEY(customer_id) REFERENCES cai_customer
      DESTINATION KEY(followed_customer_id) REFERENCES cai_customer
      LABEL Follows,
    cai_edge_customer_rates_menu_item
      SOURCE KEY(customer_id) REFERENCES cai_customer
      DESTINATION KEY(menu_id) REFERENCES cai_menu
      LABEL Rates,
    cai_edge_order_contains_order_item
      SOURCE KEY(order_id, store_id) REFERENCES cai_order
      DESTINATION KEY(order_item_id, order_id) REFERENCES cai_order_item
      LABEL IsIn,
    cai_edge_order_placed_at_store
      SOURCE KEY(order_id, store_id) REFERENCES cai_order
      DESTINATION KEY(store_id) REFERENCES cai_store
      LABEL PlacedAt,
    cai_edge_customer_has_marketing_profile
      KEY(customer_id, marketing_profile_id)
      SOURCE KEY(customer_id) REFERENCES cai_customer(customer_id)
      DESTINATION KEY(marketing_profile_id) REFERENCES cai_customer_360(customer_id)
      LABEL HasMarketingProfile PROPERTIES(customer_id, marketing_profile_id)
  )
""")

RunSpannerDDL(ddl_array)

### <font color='#4285f4'>Create JSON Views</font>

In [None]:
# Define views to simplify querying JSON data
ddl_array = []
ddl_array.append(f"""CREATE OR REPLACE VIEW `cai_customer_marketing_profile_segments` SQL SECURITY INVOKER
AS SELECT
  cai_customer_marketing_profile.customer_id,
  REPLACE(JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.behavioral_segmentation.`Benefits Sought`), " ", "")AS benefits_sought,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.behavioral_segmentation.`Browsing Behavior`)AS browsing_behavior,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.behavioral_segmentation.`Loyalty Status`)AS loyalty_status,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.behavioral_segmentation.`Occasion/Timing`)AS occasion_timing,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.behavioral_segmentation.`Purchase History`)AS purchase_history,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.behavioral_segmentation.`Spending Habits`)AS spending_habits,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.behavioral_segmentation.`Usage Frequency`)AS usage_frequency,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.behavioral_segmentation.`User Status`)AS user_status,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.customer_lifecycle_segmentation.`At-Risk Customers`), ",")AS at_risk_customers,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.customer_lifecycle_segmentation.`First-Time Customers`), ",")AS first_time_customers,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.customer_lifecycle_segmentation.`Former Customers`), ",")AS former_customers,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.customer_lifecycle_segmentation.`Inactive Customers`), ",")AS inactive_customers,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.customer_lifecycle_segmentation.`Loyal Advocates`), ",")AS loyal_advocates,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.customer_lifecycle_segmentation.`New Leads`), ",")AS new_leads,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.customer_lifecycle_segmentation.`Potential Customers`), ",")AS potential_customers,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.customer_lifecycle_segmentation.`Repeat Customers`), ",")AS repeat_customers,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Age`)AS age,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Education`)AS education,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Ethnicity`)AS ethnicity,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Family Size`)AS family_size,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Gender`)AS gender,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Generation`)AS generation,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Income`)AS income,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Language`)AS language,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Marital Status`)AS marital_status,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.demographic_segmentation.`Occupation`)AS occupation,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.geographic_segmentation.`City`)AS city,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.geographic_segmentation.`Climate`)AS climate,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.geographic_segmentation.`Country`)AS country,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.geographic_segmentation.`Population Density`)AS population_density,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.geographic_segmentation.`Region`)AS region,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.geographic_segmentation.`Time Zone`)AS time_zone,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.geographic_segmentation.`Urban/Rural`)AS urban_rural,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.needs_based_segmentation.`Challenges`)AS challenges,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.needs_based_segmentation.`Goals`)AS goals,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.needs_based_segmentation.`Pain Points`)AS pain_points,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.needs_based_segmentation.`Priorities`)AS priorities,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.needs_based_segmentation.`Specific Needs`)AS specific_needs,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.psychographic_segmentation.`Attitudes`)AS attitudes,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.psychographic_segmentation.`Hobbies`), ",")AS hobbies,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.psychographic_segmentation.`Interests`)AS interests,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.psychographic_segmentation.`Lifestyle`)AS lifestyle,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.psychographic_segmentation.`Motivations`)AS motivations,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.psychographic_segmentation.`Personality`)AS personality,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.psychographic_segmentation.`Social Class`)AS social_class,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.psychographic_segmentation.`Values`)AS customer_values,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.technographic_segmentation.`Adoption Rate`)AS adoption_rate,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.technographic_segmentation.`Browsers`)AS browsers,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.technographic_segmentation.`Devices`)AS devices,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.technographic_segmentation.`Internet Connectivity`)AS internet_connectivity,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.technographic_segmentation.`Operating Systems`)AS operating_systems,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.technographic_segmentation.`Social Media Platforms`)AS social_media_platforms,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.technographic_segmentation.`Software`)AS software,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.technographic_segmentation.`Tech Savviness`)AS tech_savviness,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.value_based_segmentation.`Cost-Benefit Analysis`)AS cost_benefit_analysis,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.value_based_segmentation.`Perceived Value`)AS perceived_value,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.value_based_segmentation.`Price Sensitivity`)AS price_sensitivity,
  JSON_VALUE(cai_customer_marketing_profile.customer_segmentation_data.customer_segments.value_based_segmentation.`Willingness to Pay`)AS willingness_to_pay
FROM `cai_customer_marketing_profile`
""")

ddl_array.append(f"""CREATE OR REPLACE VIEW `cai_customer_marketing_profile_data` SQL SECURITY INVOKER
AS SELECT
  cai_customer_marketing_profile.customer_id,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.children)AS children,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_profile_data.chocolate_preferences), ",")AS chocolate_preferences,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_profile_data.content_interaction), ",")AS content_interaction,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.customer_age)AS INT64)AS customer_age,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.education)AS education,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.facebook_bio)AS facebook_bio,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.facebook_engagement)AS facebook_engagement,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.facebook_handle)AS facebook_handle,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.instagram_bio)AS instagram_bio,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.instagram_engagement)AS instagram_engagement,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.instagram_handle)AS instagram_handle,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_profile_data.interests), ",")AS interests,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_profile_data.lifestyle), ",")AS lifestyle,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.linkedin_bio)AS linkedin_bio,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.linkedin_engagement)AS linkedin_engagement,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.linkedin_handle)AS linkedin_handle,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.martial_status)AS martial_status,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.occupation)AS occupation,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_profile_data.solicated_buying_habits), ",")AS solicated_buying_habits,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_profile_data.sports), ",")AS sports,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.tiktok_bio)AS tiktok_bio,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.tiktok_handle)AS tiktok_handle,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.twitter_bio)AS twitter_bio,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.twitter_engagement)AS twitter_engagement,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.twitter_handle)AS twitter_handle,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.youtube_bio)AS youtube_bio,
  JSON_VALUE(cai_customer_marketing_profile.customer_profile_data.youtube_handle)AS youtube_handle,
  (
    SELECT STRING_AGG(CONCAT(
      'contact_reason:', JSON_VALUE(interaction, '$.contact_reason'), ' ',
      'resolution_time:', JSON_VALUE(interaction, '$.resolution_time'), ' ',
      'sentiment_analysis:', JSON_VALUE(interaction, '$.sentiment_analysis')
    ), ' | ')
    FROM UNNEST(JSON_QUERY_ARRAY(cai_customer_marketing_profile.customer_profile_data, '$.customer_service_interactions'))AS interaction
  )AS customer_service_interactions
 FROM
  `cai_customer_marketing_profile`
""")

ddl_array.append(f"""CREATE OR REPLACE VIEW `cai_customer_marketing_profile_loyalty` SQL SECURITY INVOKER
AS SELECT
  cai_customer_marketing_profile.customer_id,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.average_amount_spent_per_order)AS NUMERIC)AS average_amount_spent_per_order,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.last_order_date)AS TIMESTAMP)AS last_order_date,
  JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.latest_review_sentiment)AS latest_review_sentiment,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.most_frequent_purchase_location)AS INT64)AS most_frequent_purchase_location,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.negative_review_percentage)AS NUMERIC)AS negative_review_percentage,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.neutral_review_percentage)AS NUMERIC)AS neutral_review_percentage,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.positive_review_percentage)AS NUMERIC)AS positive_review_percentage,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_loyalty_data.purchase_locations), ",")AS purchase_locations,
  ARRAY_TO_STRING(JSON_VALUE_ARRAY(cai_customer_marketing_profile.customer_loyalty_data.top_3_favorite_menu_items), ",")AS top_3_favorite_menu_items,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.total_amount_spent)AS NUMERIC)AS total_amount_spent,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.total_orders)AS INT64)AS total_orders,
  CAST(JSON_VALUE(cai_customer_marketing_profile.customer_loyalty_data.total_reviews)AS INT64)AS total_reviews
 FROM
  `cai_customer_marketing_profile`
""")

RunSpannerDDL(ddl_array)

### <font color='#4285f4'>Copy Source Data from BigQuery</font>

#### Create Temporary BQ Views

In [None]:
# Create temporary edge table views in BigQuery to be used for ETL into Spanner because
# Spanner limits us to 80,000 mutations per read/write query, making this approach
# faster and more straightforward.

bq_temp_view_name_array = [
    "edge_customer_places_order",
    "edge_customer_writes_review",
    "edge_order_contains_order_item",
    "edge_order_placed_at_store",
    "edge_customer_rates_menu_item",
    "edge_customer_has_marketing_profile"
]

bq_temp_view_ddl_array = []

bq_temp_view_ddl_array.append(f"""CREATE OR REPLACE VIEW `{bq_dataset}.edge_customer_places_order` (customer_id, order_id, store_id, order_datetime) AS
SELECT c.customer_id, o.order_id, o.store_id, o.order_datetime
FROM `{bq_dataset}.customer` c
JOIN `{bq_dataset}.order` o ON c.customer_id = o.customer_id;""")

bq_temp_view_ddl_array.append(f"""CREATE OR REPLACE VIEW `{bq_dataset}.edge_customer_writes_review` (customer_id, customer_review_id, review_datetime) AS
SELECT c.customer_id, cr.customer_review_id, cr.review_datetime
FROM `{bq_dataset}.customer` c
JOIN `{bq_dataset}.customer_review` cr ON c.customer_id = cr.customer_id;""")

bq_temp_view_ddl_array.append(f"""CREATE OR REPLACE VIEW `{bq_dataset}.edge_order_contains_order_item` (order_id, store_id, order_item_id) AS
SELECT o.order_id, o.store_id, oi.order_item_id
FROM `{bq_dataset}.order` o
JOIN `{bq_dataset}.order_item` oi ON o.order_id = oi.order_id;""")

bq_temp_view_ddl_array.append(f"""CREATE OR REPLACE VIEW `{bq_dataset}.edge_order_placed_at_store` (order_id, store_id, order_datetime) AS
SELECT o.order_id, o.store_id, o.order_datetime
FROM `{bq_dataset}.order` o;""")

bq_temp_view_ddl_array.append(f"""CREATE OR REPLACE VIEW `{bq_dataset}.edge_customer_has_marketing_profile` (customer_id, marketing_profile_id) AS
SELECT customer_id, customer_id
FROM `{bq_dataset}.customer_marketing_profile`;""")

# Complex case statement below was generated with the following query:
'''
  sql = f"""SELECT CONCAT('    WHEN review_text LIKE \\'%', menu_name, '%\\' THEN ', menu_id, ',') AS case_statement,
    menu_name,
    menu_id
  FROM {bq_dataset}.menu
  ORDER BY menu_id;
  """
'''

bq_temp_view_ddl_array.append(f"""CREATE OR REPLACE VIEW `{bq_dataset}.edge_customer_rates_menu_item` AS
  SELECT customer_id,
  CASE
    WHEN review_text LIKE '%Chocolate Decadence Coffee%' THEN 1
    WHEN review_text LIKE '%The Golden Trio%' THEN 2
    WHEN review_text LIKE '%Spice Trade Chocolate Flight%' THEN 3
    WHEN review_text LIKE '%Enchanted Forest Dream%' THEN 4
    WHEN review_text LIKE '%Spice Odyssey%' THEN 5
    WHEN review_text LIKE '%Cocoa Cream Delight%' THEN 6
    WHEN review_text LIKE '%Triple Chocolate Hazelnut Delice%' THEN 7
    WHEN review_text LIKE '%Chocolate Swan Cake%' THEN 8
    WHEN review_text LIKE '%Lavender Bloom Truffle%' THEN 9
    WHEN review_text LIKE '%Chocolate & Cheese Flight%' THEN 10
    WHEN review_text LIKE '%Spiced Chocolate Trio%' THEN 11
    WHEN review_text LIKE '%Saffron-Infused Chocolate Sphere%' THEN 12
    WHEN review_text LIKE '%Lavender White Chocolate Cream Puffs%' THEN 13
    WHEN review_text LIKE '%Earl Grey & Bergamot Chocolate Symphony%' THEN 14
    WHEN review_text LIKE '%Lavender Sea Salt Chocolate%' THEN 15
    WHEN review_text LIKE '%Red Velvet Surprise%' THEN 16
    WHEN review_text LIKE '%Chocolate Discovery%' THEN 17
    WHEN review_text LIKE '%Triple Chocolate Cake%' THEN 18
    WHEN review_text LIKE '%Earl Grey & Gold Truffles%' THEN 19
    WHEN review_text LIKE '%Lavender Honey Truffle%' THEN 20
    WHEN review_text LIKE '%Molten Caramel Surprise%' THEN 21
    WHEN review_text LIKE '%Lavender Mocha%' THEN 22
    WHEN review_text LIKE '%Coffee Bean Dream%' THEN 23
    WHEN review_text LIKE '%Aztec Spice%' THEN 24
    WHEN review_text LIKE '%Midnight in Paris Truffle%' THEN 25
    WHEN review_text LIKE '%Spicy Chocolate Six Pack%' THEN 26
    WHEN review_text LIKE '%Triple Layer Bar%' THEN 27
    WHEN review_text LIKE '%Midnight In Paris Cake%' THEN 28
    WHEN review_text LIKE '%Triple Chocolate Symphony Cake%' THEN 29
    WHEN review_text LIKE '%Salted Caramel Coffee Dreams%' THEN 30
    WHEN review_text LIKE '%Edible Gems%' THEN 31
    WHEN review_text LIKE '%Churro Tower%' THEN 32
    WHEN review_text LIKE '%Blooming Caramel Truffle%' THEN 33
    WHEN review_text LIKE '%Triple Chocolate Mousse Delight%' THEN 34
    WHEN review_text LIKE '%Dark & Zesty%' THEN 35
    WHEN review_text LIKE '%Spice Route Chocolate Flight%' THEN 36
    WHEN review_text LIKE '%Cosmic Cacao%' THEN 37
    WHEN review_text LIKE '%Eiffel Tower Profiterole Tower%' THEN 38
    WHEN review_text LIKE '%Parisian Ganache Trio%' THEN 39
    WHEN review_text LIKE '%Edible Masterpieces%' THEN 40
    WHEN review_text LIKE '%Lavender White Chocolate Lava Cake%' THEN 41
    WHEN review_text LIKE '%Midnight in Paris Mousse%' THEN 42
    WHEN review_text LIKE '%Cosmic Chocolate Sphere%' THEN 43
    WHEN review_text LIKE '%Golden Orb Surprise%' THEN 44
    WHEN review_text LIKE '%Parisian Spice%' THEN 45
    WHEN review_text LIKE '%Spiced Chocolate Faberge Egg%' THEN 46
    WHEN review_text LIKE '%Trio of Chocolate Bars%' THEN 47
    WHEN review_text LIKE '%Parisian Truffle Collection%' THEN 48
    WHEN review_text LIKE '%Espresso Chocolate Dream Cake%' THEN 49
    WHEN review_text LIKE '%Spice Bazaar Chocolate Sticks%' THEN 50
    WHEN review_text LIKE '%Eiffel Tower Delight%' THEN 51
    WHEN review_text LIKE '%Lavender Honey Dusk%' THEN 52
    WHEN review_text LIKE '%Toasted Marshmallow Chocolate Mousse Peak%' THEN 53
    WHEN review_text LIKE '%Chocolate Balloon Ride for Two%' THEN 54
    WHEN review_text LIKE '%Parisian Cafe Mocha%' THEN 55
    WHEN review_text LIKE '%Chocolate Dream Trio%' THEN 56
    WHEN review_text LIKE '%Parisian Nights%' THEN 57
    WHEN review_text LIKE '%Parisian Hot Chocolate%' THEN 58
    WHEN review_text LIKE '%Triple Chocolate Ganache%' THEN 59
    WHEN review_text LIKE '%Parisian Dreamscape Collection%' THEN 60
    WHEN review_text LIKE '%Chocolate Affogato%' THEN 61
    WHEN review_text LIKE '%Melting Chocolate Sphere%' THEN 62
    WHEN review_text LIKE '%Spice Route Selection%' THEN 63
    WHEN review_text LIKE '%Pour-Over Coffee%' THEN 64
    WHEN review_text LIKE '%Avant-Garde Bonbons%' THEN 65
    WHEN review_text LIKE '%Persian Dreams%' THEN 66
    WHEN review_text LIKE '%Midnight in Paris%' THEN 67
    WHEN review_text LIKE '%Triple Chocolate Hazelnut Dream%' THEN 68
    WHEN review_text LIKE '%Eiffel Tower%' THEN 69
    WHEN review_text LIKE '%Parisian Chocolate Tower Espresso%' THEN 70
    WHEN review_text LIKE '%Melting Chocolate Surprise%' THEN 71
    WHEN review_text LIKE '%Dark Chocolate Almond Bark%' THEN 72
    WHEN review_text LIKE '%Melting Chocolate Sphere Surprise%' THEN 73
    WHEN review_text LIKE '%Spiced Chocolate Symphony%' THEN 74
    WHEN review_text LIKE '%Choco-Espresso%' THEN 75
    WHEN review_text LIKE '%Parisian Cafe Au Lait%' THEN 76
    WHEN review_text LIKE '%The Artist\\'s Palette%' THEN 77
    WHEN review_text LIKE '%Chocolate Rose%' THEN 78
    WHEN review_text LIKE '%Chocolate Covered Ice Cream Bon Bon%' THEN 79
    WHEN review_text LIKE '%Chocolate Espresso Cup%' THEN 80
    WHEN review_text LIKE '%Black Pepper Chocolate Dream%' THEN 81
    WHEN review_text LIKE '%Parisian Bonbon Collection%' THEN 82
    WHEN review_text LIKE '%Signature Chocolate Cloud Cappuccino%' THEN 83
    WHEN review_text LIKE '%Golden Orb Surprise%' THEN 84
    WHEN review_text LIKE '%Golden Egg Surprise%' THEN 85
    WHEN review_text LIKE '%Lavender Latte%' THEN 86
    WHEN review_text LIKE '%Parisian Spice Rhapsody%' THEN 87
    WHEN review_text LIKE '%Ethereal Hearts Desire%' THEN 88
    WHEN review_text LIKE '%Triple Chocolate Decadence%' THEN 89
    WHEN review_text LIKE '%Decadent Chocolate Dream%' THEN 90
    WHEN review_text LIKE '%Spice Route Tasting Set%' THEN 91
    WHEN review_text LIKE '%Parisian Chocolate Caramel Dream%' THEN 92
    WHEN review_text LIKE '%Chocolate Sphere Surprise%' THEN 93
    WHEN review_text LIKE '%Molten Espresso Heart%' THEN 94
    WHEN review_text LIKE '%Golden Macaron Tower%' THEN 95
    WHEN review_text LIKE '%Chocolate Chip Cookie Mousse Duo%' THEN 96
    WHEN review_text LIKE '%Churro Trio with Dark Chocolate Dip%' THEN 97
    WHEN review_text LIKE '%Ethereal Chocolate Sphere%' THEN 98
    WHEN review_text LIKE '%Salted Caramel Pretzel Bark%' THEN 99
    WHEN review_text LIKE '%Parisian Noir Orange Dream%' THEN 100
    WHEN review_text LIKE '%Coco Chanel%' THEN 101
    WHEN review_text LIKE '%Chocolate World Tour%' THEN 102
    WHEN review_text LIKE '%Spiced Ganache Treasures%' THEN 103
    WHEN review_text LIKE '%Parisian Vanilla Bean Dream%' THEN 104
    WHEN review_text LIKE '%Chocolate Tasting Flight%' THEN 105
    WHEN review_text LIKE '%Orange Zest Coffee%' THEN 106
    WHEN review_text LIKE '%Deconstructed Chocolate Mousse%' THEN 107
    WHEN review_text LIKE '%Chocolate Hot Air Balloon Ride%' THEN 108
    WHEN review_text LIKE '%Parisian Coffee Pairing%' THEN 109
    WHEN review_text LIKE '%Molten Chocolate Lava Cake%' THEN 110
    WHEN review_text LIKE '%Spiced Chocolate Sphere Trio%' THEN 111
    WHEN review_text LIKE '%Chocolate Marshmallow Croissant%' THEN 112
    WHEN review_text LIKE '%Ebony & Ivory%' THEN 113
    WHEN review_text LIKE '%Black Pepper & Gold Sphere%' THEN 114
    WHEN review_text LIKE '%Colorful Confections%' THEN 115
    WHEN review_text LIKE '%Triple Chocolate Mousse Tower%' THEN 116
    WHEN review_text LIKE '%Melting Chocolate Globe Surprise%' THEN 117
    WHEN review_text LIKE '%Eiffel Cloud%' THEN 118
    WHEN review_text LIKE '%Sweet Ascent%' THEN 119
    WHEN review_text LIKE '%Eiffel Tower Mousse%' THEN 120
    WHEN review_text LIKE '%Coffee Ganache Dome%' THEN 121
    WHEN review_text LIKE '%Chocolate Dipped Coffee Mousse Cones%' THEN 122
    WHEN review_text LIKE '%Donut Brioche Skewers%' THEN 123
    WHEN review_text LIKE '%Parisian Chocolate Easter Egg%' THEN 124
    WHEN review_text LIKE '%Lavender Bloom%' THEN 125
    WHEN review_text LIKE '%Aztec\\'s Spicy Chocolate Tower%' THEN 126
    WHEN review_text LIKE '%Smoked Tea Truffle Quintet%' THEN 127
    WHEN review_text LIKE '%Parisian Latte%' THEN 128
    WHEN review_text LIKE '%Triple Chocolate Symphony%' THEN 129
    WHEN review_text LIKE '%Parisian Dream Bonbons%' THEN 130
    WHEN review_text LIKE '%Salted Caramel Nitro Cold Brew%' THEN 131
    WHEN review_text LIKE '%Five Spice Chocolate Celebration Cake%' THEN 132
    WHEN review_text LIKE '%Butterfly Garden%' THEN 133
    WHEN review_text LIKE '%Classic Espresso%' THEN 134
    WHEN review_text LIKE '%Chocolate Dipped Waffle Cone Bites%' THEN 135
    WHEN review_text LIKE '%Golden Night%' THEN 136
    WHEN review_text LIKE '%Parisian Cloud Espresso%' THEN 137
    WHEN review_text LIKE '%Trio of Textures%' THEN 138
    WHEN review_text LIKE '%Parisian Surprise%' THEN 139
    WHEN review_text LIKE '%Doughnut Delight%' THEN 140
    WHEN review_text LIKE '%Rich Indulgence%' THEN 141
    WHEN review_text LIKE '%Midnight in Provence%' THEN 142
    WHEN review_text LIKE '%Chocolate Infused Coffee%' THEN 143
    WHEN review_text LIKE '%Parisian Coffee Truffle Trio%' THEN 144
    WHEN review_text LIKE '%Chocolate Caramel Orb%' THEN 145
    WHEN review_text LIKE '%Classic Affogato%' THEN 146
    WHEN review_text LIKE '%Golden Parisian S\\'mores%' THEN 147
    WHEN review_text LIKE '%Pistachio Cardamom Jewels%' THEN 148
    WHEN review_text LIKE '%Coffee Break Brownie%' THEN 149
    WHEN review_text LIKE '%Spice Tower Mousse Trio%' THEN 150
    WHEN review_text LIKE '%Cardamom Coffee Indulgence%' THEN 151
    WHEN review_text LIKE '%Triple Chocolate Mousse Parfait%' THEN 152
    WHEN review_text LIKE '%Cosmic Cups%' THEN 153
    WHEN review_text LIKE '%Parisian Chocolate Dream%' THEN 154
    WHEN review_text LIKE '%Chocolate Fudge Tower%' THEN 155
    WHEN review_text LIKE '%Aromatic Adventures%' THEN 156
    WHEN review_text LIKE '% Parisian Midnight%' THEN 157
    WHEN review_text LIKE '%Chocolate Chip Cookie Ice Cream Tower%' THEN 158
    WHEN review_text LIKE '%Chocolate Macaron Delights%' THEN 159
    WHEN review_text LIKE '%Parisian Coffee%' THEN 160
    WHEN review_text LIKE '%Eiffel Tower of Chocolate Delights%' THEN 161
    WHEN review_text LIKE '%Emerald of the Seine%' THEN 162
    WHEN review_text LIKE '%Parisian Chocolate Ganache Coffee%' THEN 163
    WHEN review_text LIKE '%Molten White Chocolate Lava Cake%' THEN 164
    WHEN review_text LIKE '%Chocolate Balloon Ride%' THEN 165
    WHEN review_text LIKE '%White Chocolate Dream Latte%' THEN 166
    WHEN review_text LIKE '%Chocolate Concerto%' THEN 167
    WHEN review_text LIKE '%Spice Trader\\'s Collection%' THEN 168
    WHEN review_text LIKE '%Molten Chocolate Espresso%' THEN 169
    WHEN review_text LIKE '%Parisian Chocolate Symphony%' THEN 170
    WHEN review_text LIKE '%Chocolate Waffle Tower%' THEN 171
    WHEN review_text LIKE '%Lavender Midnight Mocha%' THEN 172
    WHEN review_text LIKE '%Decadent Chocolate Hazelnut Symphony%' THEN 173
    WHEN review_text LIKE '%Parisian Hemisphere Trio%' THEN 174
    WHEN review_text LIKE '%Salted Caramel Affogato%' THEN 175
    WHEN review_text LIKE '%Churro Tower of Dreams%' THEN 176
    WHEN review_text LIKE '%Hot Air Balloon Adventure%' THEN 177
    WHEN review_text LIKE '%Lavender Vanilla Bean Latte%' THEN 178
    WHEN review_text LIKE '%Parisian Chocolate Sphere Trio%' THEN 179
    WHEN review_text LIKE '%Chocolate Hot Air Balloon Flight%' THEN 180
    WHEN review_text LIKE '%Midnight Velvet Espresso%' THEN 181
    WHEN review_text LIKE '%Triple Chocolate Spice Deception%' THEN 182
    WHEN review_text LIKE '%Chocolate Hazelnut Symphony%' THEN 183
    WHEN review_text LIKE '%Lavender Espresso Dream%' THEN 184
    WHEN review_text LIKE '%Chocolate Trio%' THEN 185
    WHEN review_text LIKE '%Parisian Twilight Delight%' THEN 186
    WHEN review_text LIKE '%Parisian Mocha %' THEN 187
    WHEN review_text LIKE '%Chocolate Decadence Tower%' THEN 188
    WHEN review_text LIKE '%Parisian Spice Journey%' THEN 189
    WHEN review_text LIKE '%Parisian Chocolate Espresso%' THEN 190
    WHEN review_text LIKE '%Parisian Twilight Mousse%' THEN 191
    WHEN review_text LIKE '%Spice Trader\\'s Chocolate Journey%' THEN 192
    WHEN review_text LIKE '%Lavender Dream%' THEN 193
    WHEN review_text LIKE '%Chocolate Dipped Pretzel Cones with Salted Caramel%' THEN 194
    WHEN review_text LIKE '% Parisian Twilight%' THEN 195
    WHEN review_text LIKE '%Parisian Iced Coffee Dream%' THEN 196
    WHEN review_text LIKE '%Lavender Dreamscape%' THEN 197
    WHEN review_text LIKE '%Landmark Lockets%' THEN 198
    WHEN review_text LIKE '%Midnight in Paris Mocha%' THEN 199
    WHEN review_text LIKE '%Coffee Crunch%' THEN 200
    WHEN review_text LIKE '%Spiced Chocolate Napoleon%' THEN 201
    WHEN review_text LIKE '%Volcano Coffee%' THEN 202
    WHEN review_text LIKE '%Earl Grey Chocolate Tartlet%' THEN 203
    WHEN review_text LIKE '%Edible Chocolate Flames%' THEN 204
    WHEN review_text LIKE '%Cardamom Chocolate Espresso%' THEN 205
    WHEN review_text LIKE '%Chocolate Parisian Hats%' THEN 206
    WHEN review_text LIKE '%Parisian Nights Chocolate Trio%' THEN 207
    WHEN review_text LIKE '%Salted Caramel Hot Chocolate%' THEN 208
    WHEN review_text LIKE '%Triple Chocolate Coffee Dream%' THEN 209
    WHEN review_text LIKE '%Chocolate Hot Air Balloons%' THEN 210
    WHEN review_text LIKE '%Lavender Dreams Hot Chocolate%' THEN 211
    WHEN review_text LIKE '%Chocolate Covered Pretzel Choux au Craquelin%' THEN 212
    WHEN review_text LIKE '%Parisian Promenade%' THEN 213
    WHEN review_text LIKE '%Parisian Hot Chocolate Extravaganza%' THEN 214
    WHEN review_text LIKE '%Layered Chocolate Mousse%' THEN 215
    WHEN review_text LIKE '%Parisian Chocolate Map%' THEN 216
    WHEN review_text LIKE '%Parisian Delight%' THEN 217
    WHEN review_text LIKE '%Chocolate Paris Tower%' THEN 218
    WHEN review_text LIKE '%Parisian Truffle Flight%' THEN 219
    WHEN review_text LIKE '%Parisian Hot Cocoa%' THEN 220
    WHEN review_text LIKE '%Parisian Pâtisserie Perfection%' THEN 221
    WHEN review_text LIKE '%Edible Parisian Garden%' THEN 222
    WHEN review_text LIKE '%Midnight Chocolate Latte%' THEN 223
    WHEN review_text LIKE '%Melting Palette%' THEN 224
    WHEN review_text LIKE '%Parisian Gold Truffles%' THEN 225
    WHEN review_text LIKE '%Lavender White Mocha%' THEN 226
    WHEN review_text LIKE '%Mousse au Chocolat Couvert%' THEN 227
    WHEN review_text LIKE '%Moroccan Spice Journey%' THEN 228
    WHEN review_text LIKE '%Parisian Mocha%' THEN 229
    WHEN review_text LIKE '%Sweet Surrender%' THEN 230
    WHEN review_text LIKE '%Parisian Landmarks Collection%' THEN 231
    WHEN review_text LIKE '%Dark Chocolate Decadence%' THEN 232
    WHEN review_text LIKE '%Trio of Chocolate Lava Cakes%' THEN 233
    WHEN review_text LIKE '%Eastern Indulgence%' THEN 234
    WHEN review_text LIKE '%Black Pepper Ganache Sphere%' THEN 235
    WHEN review_text LIKE '%Dark Chocolate Sea Salt Meringue Pie%' THEN 236
    WHEN review_text LIKE '%Cosmic Trio%' THEN 237
    WHEN review_text LIKE '%Parisian Twilight Latte%' THEN 238
    WHEN review_text LIKE '%Chocolate Churro Dreams%' THEN 239
    WHEN review_text LIKE '%Parisian Dreamscape%' THEN 240
    WHEN review_text LIKE '%Lavender Espresso%' THEN 241
    WHEN review_text LIKE '%Chocolate Decadence%' THEN 242
    WHEN review_text LIKE '%Chai Chocolate Zoo%' THEN 243
    WHEN review_text LIKE '%Parisian Roast%' THEN 244
    WHEN review_text LIKE '%Chocolate Garden of Dreams%' THEN 245
    WHEN review_text LIKE '%Parisian Dreamscape Bonbons%' THEN 246
    WHEN review_text LIKE '%Dark Chocolate Dream Espresso%' THEN 247
    WHEN review_text LIKE '%Parisian Midnight%' THEN 248
    WHEN review_text LIKE '%Celestial Trio%' THEN 249
    WHEN review_text LIKE '%Parisian Chocolate Layered Latte%' THEN 250
    ELSE 999
  END AS menu_id,
  CASE review_sentiment
    WHEN 'Positive' THEN CAST(ROUND(4 + rand() * (5 - 4)) AS INT64)
    WHEN 'Negative' THEN CAST(ROUND(1 + rand() * (2 - 1)) AS INT64)
    WHEN 'Neutral' THEN 3
    ELSE 3
  END AS rating,
  review_datetime AS rating_datetime
  FROM `{bq_dataset}.customer_review`;
""")

for view in bq_temp_view_ddl_array:
  result = RunBQQuery(view)
  print(result)

#### Enable BigQuery Reservation API

Enterprise edition or higher is required to run Reverse ETL.

In [None]:
! gcloud services enable bigqueryreservation.googleapis.com

#### Create a Capacity Reservation

The capacity reservation includes a baseline of 0 and a max of 50 to reduce cost. 

In [None]:
sql = f"""
CREATE RESERVATION
  `{project_id}.region-us.reverse-etl-reservation`
OPTIONS (
  slot_capacity = 0,
  edition = 'ENTERPRISE',
  autoscale_max_slots = 50);
"""

result = RunBQQuery(sql)
result

#### Create Assignment

> NOTE: You may need to wait a minute for the capacity reservation to be available before creating an Assignment.

In [None]:
sql = f"""
CREATE ASSIGNMENT
  `{project_id}.region-us.reverse-etl-reservation.reverse-etl-assignment`
OPTIONS (
  assignee = 'projects/{project_id}',
  job_type = 'QUERY');
"""

result = RunBQQuery(sql)
result

#### Run Reverse ETL

> NOTE: You may need to wait a few minutes for the capacity reservation to take effect before running Reverse ETL. 

In [None]:
# Setup reverse ETL
# https://cloud.google.com/bigquery/docs/export-to-spanner#:~:text=You%20can%20do%20this%20by,and%20high%20throughput%20in%20Spanner.

source_tables = [
  "customer",
  "customer_marketing_profile",
  "customer_review",
  "menu",
  "order",
  "order_item",
  "store"
]

# Add tables from bq_temp_view_name_array to the source_tables array
source_tables.extend(bq_temp_view_name_array)

for table in source_tables:
  RunReverseETL(table)

#### Validate Data

In [None]:
# Validate data copied successfully

for table in source_tables:
  print(f"\n\n\nValidating table cai_{table}...\n")
  sql = f"""SELECT COUNT(*) AS row_count FROM cai_{table}"""
  result = RunSpannerQuery(sql)
  print(f"Row count: {result.iloc[0,0]}\nSample data:")

  sql = f"""SELECT * FROM cai_{table} LIMIT 5"""
  result = RunSpannerQuery(sql)
  print(result)

#### Drop Temporary BQ Views

In [None]:
# Once the data is copied from the BQ views, we can drop the views because we
# won't be using them in BQ.

for view in bq_temp_view_name_array:
  if view == 'customer_360':
    print(f"Skipping view {view}")
    continue
  print(f"Dropping view {view}...")
  sql = f"""DROP VIEW IF EXISTS `{bq_dataset}.{view}`"""
  result = RunBQQuery(sql)
  print(result)

#### Load Customer 360 Table

In [None]:
# There are too many rows in the customer_360 table to insert in one batch due
# to Spanner mutation limits (80,000) per transaction. We loop through batches
# of inserts here as a workaround.

# Define batch size
batch_size = 800

# Get the total number of rows to insert.
sql = f"""SELECT count(*) AS row_count
    FROM `cai_customer_marketing_profile_segments` AS mp
    INNER JOIN `cai_customer_marketing_profile_data` AS cp ON mp.customer_id = cp.customer_id
    INNER JOIN `cai_customer_marketing_profile_loyalty` AS cl ON mp.customer_id = cl.customer_id
    INNER JOIN `cai_customer_marketing_profile` AS cmp ON mp.customer_id = cmp.customer_id
"""
result = RunSpannerQuery(sql)
total_rows = int(result.iloc[0]['row_count'])

# Calculate the number of batches.
num_batches = (total_rows + batch_size - 1) // batch_size

# Insert the data in batches.
for i in range(num_batches):
    print(f"Inserting batch {i + 1} of {num_batches}")

    start_index = i * batch_size
    end_index = (i + 1) * batch_size

    sql = f"""INSERT INTO cai_customer_360 (
          customer_id,
          customer_marketing_insights,
          benefits_sought,
          browsing_behavior,
          loyalty_status,
          occasion_timing,
          purchase_history,
          spending_habits,
          usage_frequency,
          user_status,
          at_risk_customers,
          first_time_customers,
          former_customers,
          inactive_customers,
          loyal_advocates,
          new_leads,
          potential_customers,
          repeat_customers,
          age,
          education,
          ethnicity,
          family_size,
          gender,
          generation,
          income,
          language,
          marital_status,
          occupation,
          city,
          climate,
          country,
          population_density,
          region,
          time_zone,
          urban_rural,
          challenges,
          goals,
          pain_points,
          priorities,
          specific_needs,
          attitudes,
          hobbies,
          interests,
          lifestyle,
          motivations,
          personality,
          social_class,
          customer_values,
          adoption_rate,
          browsers,
          devices,
          internet_connectivity,
          operating_systems,
          social_media_platforms,
          software,
          tech_savviness,
          cost_benefit_analysis,
          perceived_value,
          price_sensitivity,
          willingness_to_pay,
          children,
          chocolate_preferences,
          content_interaction,
          customer_age,
          facebook_bio,
          facebook_engagement,
          facebook_handle,
          instagram_bio,
          instagram_engagement,
          instagram_handle,
          linkedin_bio,
          linkedin_engagement,
          linkedin_handle,
          martial_status,
          solicated_buying_habits,
          sports,
          tiktok_bio,
          tiktok_handle,
          twitter_bio,
          twitter_engagement,
          twitter_handle,
          youtube_bio,
          youtube_handle,
          customer_service_interactions,
          average_amount_spent_per_order,
          last_order_date,
          latest_review_sentiment,
          most_frequent_purchase_location,
          negative_review_percentage,
          neutral_review_percentage,
          positive_review_percentage,
          purchase_locations,
          top_3_favorite_menu_items,
          total_amount_spent,
          total_orders,
          total_reviews
        )
        SELECT
          mp.customer_id,
          cmp.customer_marketing_insights,
          mp.benefits_sought,
          mp.browsing_behavior,
          mp.loyalty_status,
          mp.occasion_timing,
          mp.purchase_history,
          mp.spending_habits,
          mp.usage_frequency,
          mp.user_status,
          mp.at_risk_customers,
          mp.first_time_customers,
          mp.former_customers,
          mp.inactive_customers,
          mp.loyal_advocates,
          mp.new_leads,
          mp.potential_customers,
          mp.repeat_customers,
          mp.age,
          mp.education,
          mp.ethnicity,
          mp.family_size,
          mp.gender,
          mp.generation,
          mp.income,
          mp.language,
          mp.marital_status,
          mp.occupation,
          mp.city,
          mp.climate,
          mp.country,
          mp.population_density,
          mp.region,
          mp.time_zone,
          mp.urban_rural,
          mp.challenges,
          mp.goals,
          mp.pain_points,
          mp.priorities,
          mp.specific_needs,
          mp.attitudes,
          mp.hobbies,
          mp.interests,
          mp.lifestyle,
          mp.motivations,
          mp.personality,
          mp.social_class,
          mp.customer_values,
          mp.adoption_rate,
          mp.browsers,
          mp.devices,
          mp.internet_connectivity,
          mp.operating_systems,
          mp.social_media_platforms,
          mp.software,
          mp.tech_savviness,
          mp.cost_benefit_analysis,
          mp.perceived_value,
          mp.price_sensitivity,
          mp.willingness_to_pay,
          cp.children,
          cp.chocolate_preferences,
          cp.content_interaction,
          cp.customer_age,
          cp.facebook_bio,
          cp.facebook_engagement,
          cp.facebook_handle,
          cp.instagram_bio,
          cp.instagram_engagement,
          cp.instagram_handle,
          cp.linkedin_bio,
          cp.linkedin_engagement,
          cp.linkedin_handle,
          cp.martial_status,
          cp.solicated_buying_habits,
          cp.sports,
          cp.tiktok_bio,
          cp.tiktok_handle,
          cp.twitter_bio,
          cp.twitter_engagement,
          cp.twitter_handle,
          cp.youtube_bio,
          cp.youtube_handle,
          cp.customer_service_interactions,
          cl.average_amount_spent_per_order,
          cl.last_order_date,
          cl.latest_review_sentiment,
          cl.most_frequent_purchase_location,
          cl.negative_review_percentage,
          cl.neutral_review_percentage,
          cl.positive_review_percentage,
          cl.purchase_locations,
          cl.top_3_favorite_menu_items,
          cl.total_amount_spent,
          cl.total_orders,
          cl.total_reviews
          FROM
          `cai_customer_marketing_profile_segments` AS mp
          INNER JOIN `cai_customer_marketing_profile_data` AS cp ON mp.customer_id = cp.customer_id
          INNER JOIN `cai_customer_marketing_profile_loyalty` AS cl ON mp.customer_id = cl.customer_id
          INNER JOIN `cai_customer_marketing_profile` AS cmp ON mp.customer_id = cmp.customer_id
        LIMIT {batch_size} OFFSET {start_index}
    """
    result = RunSpannerQuery(sql)

### <font color='#4285f4'>Load or Generate Social Data</font>

The social graph data is generated by the code below. You can re-generate it yourself using this code (which takes a few hours) by setting `regenerate_social_data = True`, or you can simply load a pre-generated dataset from BQ by keeping the default value of `regenerate_social_data = False`

In [None]:
# Choose whether to regenerate social data
regenerate_social_data = False

# Load data from BQ if regenerate_social_data = False
if regenerate_social_data == False:
  spanner_options =  f"{{'table': 'cai_edge_customer_follows_customer'}}"
  spanner_options = spanner_options.replace("'", '"')

  export_statement = f"""EXPORT DATA OPTIONS (
    uri='https://spanner.googleapis.com/projects/{project_id}/instances/{instance_id}/databases/{database_id}',
    format='CLOUD_SPANNER',
    spanner_options='''{spanner_options}'''
  )
  AS SELECT * FROM `{bq_dataset}.spanner_social_data`;"""

  result = RunBQQuery(export_statement)
  result

# Generate data if regenerate_social_data = True
else:
  # Get customer_ids of influencers
  sql = f"""SELECT customer_id FROM `{bq_dataset}.customer_360`
  WHERE customer_marketing_insights LIKE '%an influencer%'"""
  influencers = RunBQQuery(sql)
  influencers = list(influencers['customer_id'].values)

  # Get customer_ids of non-influencers
  sql = f"""SELECT customer_id FROM `{bq_dataset}.customer_360`
  WHERE customer_marketing_insights NOT LIKE '%an influencer%'"""
  followers = RunBQQuery(sql)
  followers = list(followers['customer_id'].values)

  # Generate follower data for non-influencers
  GenerateFollowerData(
      user_ids=followers, # Users we're generating social data for
      follower_ids=followers, # Regular users who are not influencers
      influencer_ids=influencers, # Users who are influencers
      min_followers=5, # Min count of regular users for any given user in this batch of user_ids
      max_followers=200, # Max count of regular users for any given user in this batch of user_ids
      min_influencer_followers=0, # Min count of followers who are influencers for any given user in this batch of user_ids
      max_influencer_followers=10, # Max count of followers who are influencers for any given user in this batch of user_ids
      mutation_limit=1000, # Max mutations sent to Spanner as a single transaction
  )

  # Generate follower data for influencers
  GenerateFollowerData(
      user_ids=influencers,
      follower_ids=followers,
      influencer_ids=influencers,
      min_followers=1000,
      max_followers=int(len(followers) * .8),
      min_influencer_followers=int(len(influencers) * .25),
      max_influencer_followers=int(len(influencers) * .8),
      mutation_limit=1000
  )

In [None]:
# Explore the social data

# Count of rows in the cai_edge_customer_follows_customer table
sql = f"""SELECT count(*) AS row_count
FROM cai_edge_customer_follows_customer;"""
result = RunSpannerQuery(sql)
print(result)

# Followers per user
sql = f"""SELECT customer_id, count(customer_id) AS follower_count
FROM cai_edge_customer_follows_customer
GROUP BY customer_id
ORDER BY 2 DESC;"""
result = RunSpannerQuery(sql)
print(result)

# Ensure customers all have social data
sql = f"""SELECT COUNT(*) as row_count FROM cai_customer c
WHERE customer_id NOT IN (
  SELECT DISTINCT customer_id FROM
  cai_edge_customer_follows_customer
  )
"""
result = RunSpannerQuery(sql)
print(result)

## <font color='#4285f4'>Query Spanner Graph</font>

### Get Personalized Menu Recommendations via Collaborative Filtering

> NOTE: If the query below does not return any results, try with a different `customer_id`. 

In [None]:
# Collaborative filtering
# Find recommended menu items for a customer (with customer_id = 10) based on the
# preferences of similar customers who have also rated the same item poorly.

sql = """GRAPH chocolate_ai_graph
MATCH (c1:cai_customer)-[r1:Rates {rating: 1}]->(m1:cai_menu)<-[r2:Rates {rating: 1}]-(c2:cai_customer)-[r3:Rates {rating: 5}]->(m2:cai_menu)
WHERE c1.customer_id <> c2.customer_id
AND c1.customer_id = 120 -- Check recommendations for different customers by changing this customer_id
AND m1.menu_name = 'Lavender Sea Salt Chocolate'
AND m2.menu_name <> 'Lavender Sea Salt Chocolate'
RETURN c1.customer_id AS current_shopper, r1.rating as disliked_item_rating, m1.menu_name, m2.menu_name AS recommended_menu_item, c2.customer_id AS similar_shopper, r3.rating as recommended_item_rating;"""

result = RunSpannerQuery(sql)
result

In [None]:
# We get lots of recommendations, so let's combine our graph query with traditional SQL
# to count the number of recommendations for each menu_item and recommend the top 3

sql = """SELECT recommended_menu_item, count(*) recommended_count FROM GRAPH_TABLE (
  chocolate_ai_graph
  MATCH (c1:cai_customer)-[r1:Rates {rating: 1}]->(m1:cai_menu)<-[r2:Rates {rating: 1}]-(c2:cai_customer)-[r3:Rates {rating: 5}]->(m2:cai_menu)
  WHERE c1.customer_id <> c2.customer_id
  AND c1.customer_id = 120
  AND m1.menu_name = 'Lavender Sea Salt Chocolate'
  AND m2.menu_name <> 'Lavender Sea Salt Chocolate'
  RETURN c1.customer_id AS current_shopper, r1.rating as disliked_item_rating, m1.menu_name, m2.menu_name AS recommended_menu_item, c2.customer_id AS similar_shopper, r3.rating as recommended_item_rating)
as gt
GROUP BY recommended_menu_item
ORDER BY 2 DESC
LIMIT 3;
"""

result = RunSpannerQuery(sql)
result

### Get Recommendations for Ambiguous Reviews with Full-text Search

In [None]:
# Show menu item with hard-to-spell name
sql = """SELECT * FROM cai_menu
WHERE menu_name = 'Lavender Sea Salt Chocolate'
"""

result = RunSpannerQuery(sql)
result

In [None]:
# There are matches in the graph for "Lavender Sea Salt Chocolate" and customer_id 120

sql = """GRAPH chocolate_ai_graph
MATCH (c1:cai_customer)-[r1:Rates {rating: 1}]->(m1:cai_menu)<-[r2:Rates {rating: 1}]-(c2:cai_customer)-[r3:Rates {rating: 5}]->(m2:cai_menu)
WHERE c1.customer_id <> c2.customer_id
AND c1.customer_id = 120 -- Check recommendations for different customers by changing this customer_id
AND m1.menu_name = 'Lavender Sea Salt Chocolate'
AND m2.menu_name <> 'Lavender Sea Salt Chocolate'
RETURN c1.customer_id AS current_shopper, r1.rating as disliked_item_rating, m1.menu_name, m2.menu_name AS recommended_menu_item, c2.customer_id AS similar_shopper, r3.rating as recommended_item_rating;"""

result = RunSpannerQuery(sql)
result

In [None]:
# BUT, there are no matches in the graph when the menu item is misspelled ("Lavinder" instead of "Lavender")

sql = """GRAPH chocolate_ai_graph
MATCH (c1:cai_customer)-[r1:Rates {rating: 1}]->(m1:cai_menu)<-[r2:Rates {rating: 1}]-(c2:cai_customer)-[r3:Rates {rating: 5}]->(m2:cai_menu)
WHERE c1.customer_id <> c2.customer_id
AND c1.customer_id = 120 -- Check recommendations for different customers by changing this customer_id
AND m1.menu_name = 'Lavinder Sea Salt Chocolate'
AND m2.menu_name <> 'Lavinder Sea Salt Chocolate'
RETURN c1.customer_id AS current_shopper, r1.rating as disliked_item_rating, m1.menu_name, m2.menu_name AS recommended_menu_item, c2.customer_id AS similar_shopper, r3.rating as recommended_item_rating;"""

result = RunSpannerQuery(sql)
result


In [None]:
# Full-text search matches both misspelled and incomplete product names ('lavinder salt' matches 'Lavender Sea Salt Chocolate')

sql = """GRAPH chocolate_ai_graph
MATCH (c1:cai_customer)-[r1:Rates {rating: 1}]->(m1:cai_menu)<-[r2:Rates {rating: 1}]-(c2:cai_customer)-[r3:Rates {rating: 5}]->(m2:cai_menu)
WHERE c1.customer_id <> c2.customer_id
AND c1.customer_id = 120 -- Check recommendations for different customers by changing this customer_id
AND SEARCH_NGRAMS(m1.menu_name_token, 'lavinder salt')
AND NOT SEARCH_NGRAMS(m2.menu_name_token, 'lavinder salt')
RETURN c1.customer_id AS current_shopper, r1.rating as disliked_item_rating, m1.menu_name, m2.menu_name AS recommended_menu_item, c2.customer_id AS similar_shopper, r3.rating as recommended_item_rating;"""

result = RunSpannerQuery(sql)
result


### Get Recommendations When No Product is Mentioned with Vector Search

> NOTE: It may take a minute or two for permissions to be setup after creating the models in the cells above. This should happen automatically, so wait for a couple of minutes and retry if you get permission errors.

In [None]:
# Get recommendations based on preferences from other users with similar profiles

reviewer_id = 120

sql = f"""GRAPH chocolate_ai_graph
MATCH (c:cai_customer)-[r:Rates {{rating: 5}}]->(m:cai_menu)
WHERE c.customer_id IN (
    SELECT customer_id
    FROM cai_customer_marketing_profile
    WHERE customer_id <> {reviewer_id}
    ORDER BY COSINE_DISTANCE(
        customer_segmentation_data_embedding,
        (SELECT customer_segmentation_data_embedding FROM cai_customer_marketing_profile WHERE customer_id = {reviewer_id})
      )
    LIMIT 10
  )
RETURN m.menu_name AS recommended_menu_item, c.customer_id AS similar_shopper, r.rating as recommended_item_rating;"""

result = RunSpannerQuery(sql)
result

### Find Potential Brand Partners via Social Graph

In [None]:
# Find influencers who are followed by at-risk customers so that we can consider
# a brand partnership with the influencers and retain the at-risk customers.

sql = """GRAPH chocolate_ai_graph
MATCH (c360_1:cai_customer_360)<-[:HasMarketingProfile]-(c1:cai_customer)<-[:Follows]-(c2:cai_customer)-[:HasMarketingProfile]->(c360_2:cai_customer_360)
WHERE c360_1.loyalty_status LIKE 'Loyal%'
  AND c360_2.loyalty_status = 'At-Risk'
RETURN c1.customer_name, c1.customer_id, count(c2) AS num_at_risk_followers
GROUP BY c1.customer_id, c1.customer_name
ORDER BY num_at_risk_followers DESC
LIMIT 10;"""

result = RunSpannerQuery(sql)
result

In [None]:
# Now let's use traditional SQL to enrich the results to determine which
# influencers would make the best partners

sql = """WITH gt AS (
  SELECT * FROM GRAPH_TABLE (chocolate_ai_graph
  MATCH (c360_1:cai_customer_360)<-[:HasMarketingProfile]-(c1:cai_customer)<-[:Follows]-(c2:cai_customer)-[:HasMarketingProfile]->(c360_2:cai_customer_360)
  WHERE c360_1.loyalty_status LIKE 'Loyal%'
    AND c360_2.loyalty_status = 'At-Risk'
  RETURN c1.customer_name, c1.customer_id, count(c2) AS num_at_risk_followers
  GROUP BY c1.customer_id, c1.customer_name
  ORDER BY num_at_risk_followers DESC
  LIMIT 10)
), follower_count AS (
    SELECT followed_customer_id, COUNT(followed_customer_id) AS total_follower_count
    FROM cai_edge_customer_follows_customer
    WHERE followed_customer_id IN (
      SELECT customer_id FROM gt
    )
    GROUP BY followed_customer_id
  )
  SELECT gt.customer_name AS influencer
    , gt.customer_id AS influencer_id
    , gt.num_at_risk_followers
    , tfc.total_follower_count
    , c360.loyalty_status AS influencer_loyalty_status
    , c360.age
    , c360.education
    , c360.gender
    , c360.marital_status
    , c360.occupation
    , c360.children
    , c360.facebook_bio
    , c360.facebook_engagement
    , c360.facebook_handle
    , c360.instagram_bio
    , c360.instagram_engagement
    , c360.instagram_handle
    , c360.linkedin_bio
    , c360.linkedin_engagement
    , c360.linkedin_handle
    , c360.tiktok_bio
    , c360.tiktok_handle
    , c360.twitter_bio
    , c360.twitter_engagement
    , c360.twitter_handle
    , c360.youtube_bio
    , c360.youtube_handle
  FROM gt
  JOIN cai_customer_360 AS c360 ON gt.customer_id = c360.customer_id
  JOIN follower_count AS tfc ON gt.customer_id = tfc.followed_customer_id
  ORDER BY gt.num_at_risk_followers DESC;
"""

result = RunSpannerQuery(sql)
result

### <font color='#4285f4'>Visualize the Social Graph</font>

In [None]:
# Helper function for visualization with pyviz
def visualize_graph(query_result):
    """
    Visualizes the results of a Spanner graph query with influencers and followers using pyvis.

    Args:
        query_result: A pandas DataFrame containing the query results.
    """

    net = Network(height='750px', width='100%', bgcolor='#222222', font_color='white', notebook=True, filter_menu=True, cdn_resources='in_line')

    # Convert DataFrame to a list of tuples for easier handling
    graph_data = list(query_result[['influencer_id', 'influencer_name', 'follower_id', 'follower_name']].itertuples(index=False, name=None))

    # Add nodes and edges
    for influencer_id, influencer_name, follower_id, follower_name in graph_data:
        # Add nodes with labels and titles
        net.add_node(follower_id, label=f"Follower: {follower_name} ({follower_id})", title=f"Follower: {follower_name} ({follower_id})", color='red')
        net.add_node(influencer_id, label=f"Influencer: {influencer_name} ({influencer_id})", title=f"Influencer: {influencer_name} ({influencer_id})", color='green')

        # Add edge
        net.add_edge(influencer_id, follower_id, title="Follows")

    # Customize visualization options (optional)
    net.force_atlas_2based(overlap=1)
    net.show_buttons(filter_=['physics']) # Useful for tuning settings before finalizing graph
    #net.toggle_stabilization(True)
    net.toggle_physics(True)

    return net

In [None]:
# Find influencers who are followed by at-risk customers so that we can consider
# a brand partnership with the influencers and retain the at-risk customers.
sql = """GRAPH chocolate_ai_graph
MATCH (c360_1:cai_customer_360)<-[:HasMarketingProfile]-(c1:cai_customer)<-[:Follows]-(c2:cai_customer)-[:HasMarketingProfile]->(c360_2:cai_customer_360)
WHERE c360_1.loyalty_status LIKE 'Loyal%'
  AND c360_2.loyalty_status = 'At-Risk'
RETURN c1.customer_name, c1.customer_id, count(c2) AS num_at_risk_followers
GROUP BY c1.customer_id, c1.customer_name
ORDER BY num_at_risk_followers DESC
LIMIT 10;"""

result = RunSpannerQuery(sql)
influencer_ids = list(result['customer_id'])
in_string = str(set(influencer_ids)).replace("'", "").replace("{", "(").replace("}", ")")



In [None]:
# Get detailed graph nodes and edges for visualzation
sql = f"""SELECT
  ce.followed_customer_id AS influencer_id,
  ic.customer_name AS influencer_name,
  ce.customer_id AS follower_id,
  fc.customer_name AS follower_name,
FROM cai_edge_customer_follows_customer ce
JOIN cai_customer_360 c360 ON ce.customer_id = c360.customer_id
JOIN cai_customer ic ON ce.followed_customer_id = ic.customer_id
JOIN cai_customer fc ON ce.customer_id = fc.customer_id
WHERE c360.loyalty_status = 'At-Risk'
AND followed_customer_id IN {in_string}
"""

result = RunSpannerQuery(sql)
result

# Visualize the results
# https://pyvis.readthedocs.io/en/latest/tutorial.html#using-pyvis-within-jupyter-notebook
net = visualize_graph(result)
net.show("customer_graph.html")
display(HTML('customer_graph.html'))

## <font color='#4285f4'>Query Spanner as an External Dataset in BigQuery</font>

An external dataset is a connection between BigQuery and an external data source at the dataset level. It lets you query transactional data in Spanner databases with GoogleSQL without moving data from Spanner to BigQuery storage.

The tables in an external dataset are automatically populated from the tables in the corresponding external data source. You can query these tables directly in BigQuery, but you cannot make modifications, additions, or deletions. However, any updates that you make in the external data source are automatically reflected in BigQuery.

When using external datasets, Spanner's Data Boost feature is always used and you don't have to enable it manually. Data Boost is a fully managed, serverless feature that provides independent compute resources for supported Spanner workloads. Data Boost lets you execute analytics queries and data exports with near-zero impact to existing workloads on the provisioned Spanner instance. Data Boost lets you run federated queries with independent compute capacity separate from your provisioned instances to avoid impacting existing workloads on Spanner. Data Boost is most impactful when you run complex ad hoc queries, or when you want to process large amounts of data without impacting the existing Spanner workload. Running federated queries with Data Boost can lead to significantly lower CPU consumption, and in some cases, lower query latency.

You can read more about external datasets in BigQuery [here](https://cloud.google.com/bigquery/docs/spanner-external-datasets) and Spanner Data Boost [here](https://cloud.google.com/spanner/docs/databoost/databoost-overview).

### Create External Schema

In [None]:
# Create the external schema
spanner_external_schema_name = 'chocolate_ai_spanner_external_schema'

# This uses predefined roles for Spanner. To use a customer role, see the doc below.
# https://cloud.google.com/bigquery/docs/connect-to-spanner#bq:~:text=%22-,database_role,-%22%3A%20(Optional)%20If
sql = f"""CREATE EXTERNAL SCHEMA {spanner_external_schema_name}
  OPTIONS (
    external_source = 'google-cloudspanner:///projects/{project_id}/instances/{instance_id}/databases/{database_id}',
    location = '{bigquery_location}');"""

result = RunBQQuery(sql)
result

In [None]:
# Confirm the external schema is created
sql = f"""SELECT *
FROM `{project_id}.region-{bigquery_location}.INFORMATION_SCHEMA.SCHEMATA`
WHERE schema_name = '{spanner_external_schema_name}'"""

result = RunBQQuery(sql)
result

In [None]:
# Test querying Spanner data through BigQuery
sql = f"""SELECT *
FROM `{project_id}.{spanner_external_schema_name}.cai_customer` LIMIT 5
"""

result = RunBQQuery(sql)
result

### Query Real-time Orders and Historical Orders Separately

In [None]:
# Set timestamps to separate current order in Spanner from historic orders in BigQuery
current_datetime = datetime.now(timezone.utc)

bq_from_datetime = current_datetime - timedelta(days=365)
bq_from_datetime = bq_from_datetime.strftime("%Y-%m-%d %H:%M:%S%z")

bq_to_datetime = current_datetime - timedelta(days=30)
bq_to_datetime = bq_to_datetime.strftime("%Y-%m-%d %H:%M:%S%z")

In [None]:
# Query last 30 days of order data directly from Spanner
sql = f"""SELECT DATE_TRUNC(CAST(order_completion_datetime AS DATE), WEEK) AS week,
COUNT(*)
FROM cai_order
WHERE order_completion_datetime BETWEEN '{bq_to_datetime}' AND '{current_datetime}'
GROUP BY week
ORDER BY 1
"""

result = RunSpannerQuery(sql)
result

In [None]:
# Query preceding 11 months of data from BigQuery
sql = f"""SELECT DATE_TRUNC(order_completion_datetime, WEEK) AS week,
  COUNT(*)
FROM `{bq_dataset}.order`
WHERE order_completion_datetime BETWEEN '{bq_from_datetime}' AND '{bq_to_datetime}'
GROUP BY WEEK;
"""
result = RunBQQuery(sql)
result

### Query Real-time Orders and Historical Orders Together

In [None]:
# Query Spanner and BigQuery Together with External Dataset
sql = f"""WITH union_result AS (
    SELECT DATE_TRUNC(CAST(order_completion_datetime AS DATE), WEEK) AS week,
        COUNT(*) AS order_count
  FROM   `{project_id}.{spanner_external_schema_name}.cai_order` span_o
  WHERE  span_o.order_completion_datetime BETWEEN '{bq_to_datetime}' AND '{current_datetime}'
  GROUP  BY week
  UNION ALL
  SELECT DATE_TRUNC(CAST(order_completion_datetime AS DATE), WEEK) AS week,
        COUNT(*) AS order_count
  FROM   `{bq_dataset}.order`
  WHERE  order_completion_datetime BETWEEN '{bq_from_datetime}' AND '{bq_to_datetime}'
  GROUP  BY week
  ORDER  BY week
  )
  SELECT week, SUM(order_count)
  FROM union_result
  GROUP BY week
  ORDER BY week;
"""

result = RunBQQuery(sql)
result

## <font color='#4285f4'>Run Vector Search in Spanner</font>

### Test the Models

Test the models created earlier in the notebook. 

> NOTE: It may take a minute or two for permissions to be setup after creating the models. This should happen automatically, so wait for a couple of minutes and retry if you get permission errors.

In [None]:
# Test the embedding model
sql = """SELECT embeddings.values from ML.PREDICT(MODEL EmbeddingsModel,
  (SELECT customer_name as content FROM cai_customer where customer_id = 2555)
)"""

result = RunSpannerQuery(sql)
result


In [None]:
# Test the LLM model
sql = """SELECT *
FROM ML.PREDICT(
MODEL LLMModel,
(   SELECT
'What is so great about chocolate?' AS prompt),
STRUCT(256 AS maxOutputTokens))"""

result = RunSpannerQuery(sql)
result

### Run an Exact Nearest Neighbor (ENN) Vector Search Query in Spanner

In [None]:
# Search the cai_customer_marketing_profile for 'Young professional'
# This allows you to perform flexible semantic search on customer
# marketing insight data to better target specific customers with
# relevant marketing materials.

# Define your search_phrase
search_phrase = "Young professional"

# Get an embedding for your search_phrase
sql = f"""SELECT embeddings.values
  FROM ML.PREDICT(
    MODEL EmbeddingsModel,
    (SELECT '{search_phrase}' as content))
"""

result = RunSpannerQuery(sql)
embedding = result.iloc[0,0]

# Run the ENN search
sql = f"""SELECT customer_id,
  COSINE_DISTANCE(
    customer_marketing_insights_embedding,
    {embedding}
  ) as dist,
  customer_marketing_insights
FROM cai_customer_marketing_profile
ORDER BY dist
LIMIT 10;"""

result = RunSpannerQuery(sql)
result

### OPTIONAL: Add an ANN Vector Index

The `COSINE_DISTANCE()` function used above does an Exact Nearest Neighbor search, where we compute the distance between the query vector and every vector in the table. This performs fine on our small dataset, but for larger datasets with Millions-Billions of vectors, this approach does not scale.

We can significantly improve the performance of our vector query using approximate nearest neighbor (ANN) search in Spanner with the `APPROX_COSINE_DISTANCE()` function, delivering:
* Scale and speed: fast, high-recall search scaling to more than 10B vectors
* Operational simplicity: no need to copy your data to a specialized vector DB
* Consistency: the results are always fresh, reflecting the latest updates  

However, we need to make a few changes to our instance and table before we can create the index:
* Scale up the instance (ANN indexing is only supported on instances with at least 1 node)
* Create and load a new embedding column, specifying `vector_length=>768` in the column definition.
* Re-write our query to use the `APPROX_COSINE_DISTANCE()` function.

#### Scale Up Spanner Instance

In [None]:
# Note: ANN search index feature requires at least 1 full node while in preview,
#       otherwise you'll get the following error:
#       {'code': 12, 'message': 'VECTOR INDEX is not available in granular instances in preview'}
# Ref: https://cloud.google.com/spanner/docs/find-approximate-nearest-neighbors#:~:text=Spanner%20accelerates%20ANN%20vector%20searches,data%20and%20facilitate%20faster%20searches.

# Scale up the Spanner instance
result = ScaleSpannerInstance(processing_units = 1000)
result


#### Create and Load New Embedding Column

In [None]:
ddl_array = []

# Create and load a new embedding column, specifying vector_length=>768 in the column definition.
ddl_array.append("""ALTER TABLE cai_customer_marketing_profile ADD COLUMN IF NOT EXISTS customer_marketing_insights_embedding_span ARRAY<FLOAT64>(vector_length=>768)
""")

result = RunSpannerDDL(ddl_array)
result


In [None]:
# There are too many rows in the cai_customer_marketing_profile table to update
# in one batch due to Spanner mutation limits (80,000) per transaction. We loop
# through batches of updates here as a workaround.

# Define batch size
batch_size = 1000

# Get the total number of rows to insert.
sql = f"""SELECT count(*) AS row_count
    FROM `cai_customer_marketing_profile`
"""
result = RunSpannerQuery(sql)
total_rows = int(result.iloc[0]['row_count'])

# Calculate the number of batches.
num_batches = (total_rows + batch_size - 1) // batch_size

# Insert the data in batches.
for i in range(num_batches):
    print(f"Updating batch {i + 1} of {num_batches}")

    start_index = i * batch_size
    end_index = (i + 1) * batch_size

    sql = f"""
        UPDATE cai_customer_marketing_profile
        SET customer_marketing_insights_embedding_span = customer_marketing_insights_embedding
        WHERE customer_id IN (
            SELECT customer_id
            FROM cai_customer_marketing_profile
            ORDER BY customer_id  -- Or any other suitable column for ordering
            LIMIT {batch_size} OFFSET {start_index}
        )
    """

    result = RunSpannerQuery(sql)

#### Create ANN Index


In [None]:
ddl_array = []

ddl_array.append(f"""CREATE VECTOR INDEX customer_marketing_insights_embedding_idx
  ON cai_customer_marketing_profile(customer_marketing_insights_embedding_span)
  WHERE customer_marketing_insights_embedding_span IS NOT NULL
  OPTIONS (distance_type = 'COSINE', tree_depth = 2, num_leaves = 1000)
""")

result = RunSpannerDDL(ddl_array)
result

#### Run an Approximate Nearest Neighbor (ANN) Search

In [None]:
# Define your search_phrase
search_phrase = "Young professional"

# Get an embedding for your search_phrase
sql = f"""SELECT embeddings.values
  FROM ML.PREDICT(
    MODEL EmbeddingsModel,
    (SELECT '{search_phrase}' as content))
"""

result = RunSpannerQuery(sql)
embedding = result.iloc[0,0]

# Search the cai_customer_marketing_profile for search_phrase
sql = f"""SELECT customer_id,
  APPROX_COSINE_DISTANCE(
    {embedding},
    customer_marketing_insights_embedding_span,
    options=>JSON'{{"num_leaves_to_search": 1000}}'
  ) AS distance,
  customer_marketing_insights,
  customer_marketing_insights_embedding_span
FROM cai_customer_marketing_profile@{{FORCE_INDEX=customer_marketing_insights_embedding_idx}}
WHERE customer_marketing_insights_embedding_span IS NOT NULL
ORDER BY distance
LIMIT 10;"""

result = RunSpannerQuery(sql)
result

#### Drop ANN Index

In [None]:
# Drop ANN index
ddl_array = []

ddl_array.append("""DROP INDEX IF EXISTS customer_marketing_insights_embedding_idx""")

result = RunSpannerDDL(ddl_array)
result

#### Scale Spanner Back Down

In [None]:
# Scale Spanner instance back down to save on cost
result = ScaleSpannerInstance(processing_units = 100)
result

### <font color='#4285f4'>Have Gemini Explain Query Results</font>

In [None]:
# Search the cai_customer_marketing_profile for 'Young professional'
# This is an exact nearest neighbor search, so this query can take ~60-120 seconds

# Define your search_phrase
search_phrase = "Young professional"

# Get an embedding for your search_phrase
sql = f"""SELECT embeddings.values
  FROM ML.PREDICT(
    MODEL EmbeddingsModel,
    (SELECT '{search_phrase}' as content))
"""

result = RunSpannerQuery(sql)
embedding = result.iloc[0,0]

# Run the ENN search with Gemini explanation
sql = f"""WITH vector_query AS (
  SELECT customer_id,
    COSINE_DISTANCE(
      customer_marketing_insights_embedding,
      {embedding}
    ) as dist,
    customer_marketing_insights,
    GENERATE_UUID() AS temp_row_id  -- Use UUID for unique identifier
  FROM cai_customer_marketing_profile
  ORDER BY dist
  LIMIT 5
)
SELECT
    content as explanation,
    vq.customer_id,
    vq.dist,
    vq.customer_marketing_insights
FROM ML.PREDICT(
    MODEL LLMModel,
    (SELECT
        'Explain how the following vector query result is relevant to this search phrase: "{search_phrase}" Vector query result: ' || customer_marketing_insights AS prompt,
        temp_row_id
    FROM vector_query),
    STRUCT(256 AS maxOutputTokens)) as ml
JOIN vector_query AS vq ON ml.temp_row_id = vq.temp_row_id;
"""

result = RunSpannerQuery(sql)
result

## <font color='#4285f4'>Clean Up</font>

In [None]:
# Delete external dataset
sql = f"""DROP SCHEMA IF EXISTS `chocolate_ai_spanner_external_schema`;"""

result = RunBQQuery(sql)
result

In [None]:
# Delete instance
uri = f"https://spanner.googleapis.com/v1/projects/{project_id}/instances/chocolate-ai-{project_id}"
http_verb = "DELETE"

response = restAPIHelper(uri, http_verb, {})
response

##  <font color='#4285f4'>Reference Links</font>


- [Spanner Graph Overview](https://cloud.google.com/products/spanner/graph?e=48754805&hl=en)
- [Codelab: Getting started with Spanner Graph](https://codelabs.developers.google.com/codelabs/spanner-graph-getting-started#5)
- [Querying Spanner Graph](https://cloud.google.com/spanner/docs/graph/queries-overview)
- [Spanner Full-text Search](https://cloud.google.com/spanner/docs/full-text-search)
- [Introducing BigQuery External Datasets for Spanner](https://cloud.google.com/blog/products/data-analytics/introducing-bigquery-external-datasets-for-spanner?e=48754805)
- [Spanner Data Boost](https://cloud.google.com/spanner/docs/databoost/databoost-overview)
- [Spanner vector search](https://cloud.google.com/blog/products/databases/how-spanner-vector-search-supports-generative-ai-apps?e=48754805)
- [Spanner ANN vector search](https://cloud.google.com/blog/products/databases/spanner-now-supports-approximate-nearest-neighbor-search?e=48754805)
- [Codelab: Getting started with Spanner Vector Search](https://codelabs.developers.google.com/codelabs/spanner-getting-started-vector-search#4)
- [Invoke Gemini from Spanner with ML.PREDICT()](https://cloud.google.com/spanner/docs/ml-tutorial)
- [Pyvis for graph visualization](https://pyvis.readthedocs.io/en/latest/)
- [Force Graph for graph visualization](https://www.npmjs.com/package/force-graph)
- [Spanner REST API Quickstart](https://cloud.google.com/spanner/docs/getting-started/rest)
- [Spanner JSON functions](https://cloud.google.com/spanner/docs/reference/standard-sql/json_functions)
- [Intro to BigQuery Views](https://cloud.google.com/bigquery/docs/views-intro)