### <font color='#4285f4'>Overview</font>

This demonstration simulates a geofencing campaign around four Chocolate AI stores in Paris. We simulate 100 or more customers moving throughout the city. As these customers enter a designated 1 km radius around any of the stores, a continuous query triggers an alert to a Pub/Sub topic, representing a targeted special offer sent to their mobile devices. This showcases how real-time location data, processed through Kafka, Dataflow, and BigQuery, can drive targeted marketing campaigns.

*We assume they are running our application and have location services turned on.*


Process Flow:

1.  **Infrastructure Setup:**
    *   Provision a Managed Service for Apache Kafka and create a Kafka Topic.
    *   Initiate a Dataflow pipeline to stream data from Kafka to a BigQuery table (`customer_geo_location`).
    *   Create a BigQuery reservation with sufficient capacity for continuous queries (50 slots minimum, each continuous query utilizes ~2 slots).

2.  **Data Generation (Simulation):**
    *   Generate 100 (or more) simulated people within the city of Paris.
        *   Each person is assigned a random starting and ending location within a predefined bounding box (e.g., subset of Paris).
        *   Each person moves at a different, randomly assigned walking rate.
    *   For each person, at every second:
        *   Publish each customer's prior latitude/longitude (for trajectory analysis), current latitude/longitude, along with a timestamp, to the Kafka topic.
        *   The demo includes some fields prefixed with "debugging_" to facilitate better understanding of the streaming data.

3.  **Data Ingestion and Processing:**
    *   Dataflow reads data from the Kafka topic and writes it to the BigQuery table (`customer_geo_location`).
    *   Implement two continuous queries on the `customer_geo_location` table:
        *   Query 1: Calculate if the customers breaks the 1 km geo boundary of any of the 4 stores and insert into a BigQuery table.
        *   Query 2: Calculate if the customers breaks the 1 km geo boundary of any of the 4 stores and publish to Pub/Sub.
        *   Call the Gemini API in realtime to generate marketing text based on the user's context.
        *   Two continuous queries are used to demonstrate both techniques in the demo.
        
4. **OPTIONAL:**
    * a. You can then read Pub/Sub and generate a push notification to the customer's cell phone (Chocolate AI App)

Cost:
* Very High (~ $5 per hour): Kafka, Dataflow and BigQuery Reservation.  
  - You can run this for an hour or two at low cost, but delete the resources when done.
* Medium: Remember to stop your Colab Enterprise Notebook Runtime

Author:
* Adam Paternostro

In [None]:
# Architecture Diagram
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Artifacts/Campaign-Performance-Geofencing-Simulation-Architecture.png', width=1200)

### <font color='#4285f4'>Video Walkthrough</font>

[![Video](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/adam-paternostro-video.png)](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Performance-Geofencing-Simulation.mp4)


In [None]:
from IPython.display import HTML

HTML("""
<video width="800" height="600" controls>
  <source src="https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Performance-Geofencing-Simulation.mp4" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")

### <font color='#4285f4'>License</font>


```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

Author: Adam Paternostro

### <font color='#4285f4'>Pip installs</font>

In [None]:
# PIP Installs
import sys

# https://kafka-python.readthedocs.io/en/master/index.html
!{sys.executable} -m pip install kafka-python

# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
!{sys.executable} -m pip install confluent-kafka

### <font color='#4285f4'>Initialize</font>

In [None]:
import json
import random
import time
import datetime
import base64

import google.auth
import google.auth.transport.urllib3
import urllib3

In [None]:
# Set these (run this cell to verify the output)

# WARNING: Hardcoded for now
# For testing you need (These will be automated with Terraform)
# network (vpc-main), kafka-subnet, dataflow-subnet
# service principals: dataflow-service-account [Optional: kafka-service-principal for using a service principal]
# buckets: ${project_id} (for AVRO schema), dataflow-staging-us-central1-756740881369 (for dataflow temp files w/o soft delete on)
# BigQuery dataset: chocolate_ai [You need to create this]
# BigQuery table: customer_geo_location [This is created for you]

bigquery_location = "${bigquery_location}"
region = "${region}"
kafka_cluster_name = "chocolate-ai-kafka-cluster-01"
kafka_topic_name = "customer-location-topic-01"
dataflow_bucket = "${dataflow_staging_bucket}" # should not have logical delete on
dataflow_service_account = "${dataflow_service_account}" # Needs Role: roles/managedkafka.client
bigquery_dataset_name = "${bigquery_chocolate_ai_dataset}"
bigquery_streaming_destination_table = "customer_geo_location"
kafka_subnet = "kafka-subnet"
dataflow_subnet = "dataflow-subnet"

# kafka_service_principal_name = "kafka-service-principal" # No longer need since using logged in user
# kafka_service_principal_email = "kafka-service-principal-email" # No longer need since using logged in user

# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) == 0:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

### <font color='#4285f4'>Helper Methods</font>

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### createApacheKafkaForBigQueryCluster
Creates the cluster if it does not exist.  Waits for it to be created.

In [None]:
def createApacheKafkaForBigQueryCluster():
  """Creates a Apache Kafka For BigQuery Cluster."""

  # First find the cluster if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

  # Test to see if cluster exists, if so return
  if "clusters" in json_result:
    for item in json_result["clusters"]:
      print(f"Apache Kafka for BigQuery: {item['name']}")
      # "projects/${project_id}/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}":
        print("Apache Kafka for BigQuery already exists")
        return f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}"

  # Create Apache Kafka For BigQuery Cluster
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/create
  print("Creating Apache Kafka For BigQuery Cluster")

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters?clusterId={kafka_cluster_name}"

  # Larger Apache Kafka Cluster
  # vcpuCount: 32 -> You can probably use less CPUs since they are mainly ideal
  # memoryBytes: 34359738368 -> RAM was at 50% when doing 11,000 customers

  request_body = {
      "capacityConfig": {
        "vcpuCount": "3",
        "memoryBytes": "3221225472"
      },
      "gcpConfig": {
          "accessConfig": {
              "networkConfigs": {
                  "subnet": f"projects/{project_id}/regions/{region}/subnetworks/{kafka_subnet}"
                  }
            }
        }
    }

  json_result = restAPIHelper(url, "POST", request_body)

  name = json_result["name"]
  done = json_result["done"]
  print("Apache Kafka for BigQuery created: ", name)
  return f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}"

#### waitForApacheKafkaForBigQueryCluster
Loops until cluster is created

In [None]:
def waitForApacheKafkaForBigQueryCluster(operation):
  """
  Waits for an Apache Kafka For BigQuery Cluster to be Created.

  opertion:
    projects/${project_id}/locations/us-central1/operations/operation-1723064212031-61f1e264889a9-9e3a863b-90613855
  """

  url = f"https://managedkafka.googleapis.com/v1/{operation}"
  max_retries = 100
  attempt = 0

  while True:
    # Gather existing connections
    json_result = restAPIHelper(url, "GET", None)
    print(f"waitForApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

    # Test to see if connection exists, if so return
    if "state" in json_result:
      if json_result["state"] == "ACTIVE":
        print("Apache Kafka for BigQuery Cluster created")
        return None

    # Wait for 10 seconds
    attempt += 1
    if attempt > max_retries:
      raise RuntimeError("Apache Kafka for BigQuery Cluster not created")
    time.sleep(30)

#### deleteApacheKafkaForBigQueryCluster
Delete the cluster if exists

In [None]:
def deleteApacheKafkaForBigQueryCluster():
  """Deletes a Apache Kafka For BigQuery Cluster."""

  # First find the cluster if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")
  found = False

  # Test to see if cluster, if so then delete
  if "clusters" in json_result:
    for item in json_result["clusters"]:
      print(f"Apache Kafka for BigQuery: {item['name']}")
      # "projects/${project_id}/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}":
        print("Apache Kafka for BigQuery  exists")
        found = True
        break

  if found == False:
    print("Apache Kafka for BigQuery does not exist")
    return None

  # Create Apache Kafka For BigQuery Cluster
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/delete
  print("Deleting Apache Kafka For BigQuery Cluster")

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}"

  json_result = restAPIHelper(url, "DELETE", request_body={})

  print("Apache Kafka for BigQuery deleted")

#### createApacheKafkaForBigQueryTopic
Create the topic if not exists

In [None]:
def createApacheKafkaForBigQueryTopic():
  """Creates a Apache Kafka For BigQuery Topic."""

  # First find the topic if it exists
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters.topics/list

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics"

  # Gather existing clusters
  json_result = restAPIHelper(url, "GET", None)
  print(f"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}")

  # Test to see if cluster exists, if so return
  if "topics" in json_result:
    for item in json_result["topics"]:
      print(f"Apache Kafka for BigQuery Topic: {item['name']}")
      # "projects/${project_id}/locations/us-central1/clusters/kafka-cluster"
      if item["name"] == f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics/{kafka_topic_name}":
        print("Apache Kafka for BigQuery Topic already exists")
        return None


  # Create Apache Kafka For BigQuery Topic
  # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters.topics/create
  print("Creating Apache Kafka For BigQuery Topic")

  url = f"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics?topicId={kafka_topic_name}"

  # partition_count 32 -> for larger cluster
  request_body = {
      "partition_count"    : 6,
      "replication_factor" : 3
    }

  json_result = restAPIHelper(url, "POST", request_body)

  name = json_result["name"]
  print("Apache Kafka for BigQuery Topic created: ", name)
  return None

#### RunQuery
Runs a BigQuery SQL statement

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

In [None]:
def GetMaxNextValue(fully_qualified_table_name, field_name):
  from google.cloud import bigquery
  client = bigquery.Client()
  sql = f"""
  SELECT IFNULL(MAX({field_name}),0) AS result
    FROM `{fully_qualified_table_name}`
  """
  # print(sql)
  df_result = client.query(sql).to_dataframe()
  # display(df_result)
  return int(df_result['result'].iloc[0]) + 1

### <font color='#4285f4'>Create Apache Kafka for BigQuery Cluster</font>

Create the cluster and the topic.

In [None]:
# To see your clusters: https://console.cloud.google.com/managedkafka/clusterList

opertion = createApacheKafkaForBigQueryCluster()

if opertion is not None:
  waitForApacheKafkaForBigQueryCluster(opertion)

createApacheKafkaForBigQueryTopic()

### <font color='#4285f4'>Create BigQuery tables</font>

In [None]:
%%bigquery

#DROP TABLE `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`;

CREATE TABLE IF NOT EXISTS `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`
(
    customer_geo_location_id    STRING   OPTIONS(description="Primary key."),
    customer_id                 INT64    OPTIONS(description="Foreign Key: Customer"),
    event_timestamp_millis      INT64    OPTIONS(description="Unix Epoch timestamp on the customers device"),
    prior_latitude              FLOAT64  OPTIONS(description="The prior latitude of the customer location."),
    prior_longitude             FLOAT64  OPTIONS(description="The prior longitude of the customer location."),
    current_latitude            FLOAT64  OPTIONS(description="The current latitude of the customer location."),
    current_longitude           FLOAT64  OPTIONS(description="The current longitude of the customer location."),

    debug_destination_latitude  FLOAT64  OPTIONS(description="The destination latitude of the customer location.  We would not know this, but this if for debugging/demo purposes."),
    debug_destination_longitude FLOAT64  OPTIONS(description="The destination longitude of the customer location.  We would not know this, but this if for debugging/demo purposes."),
    debug_walking_speed_mps     FLOAT64  OPTIONS(description="The speed at which the person is walking.  We could compute this, but this if for debugging/demo purposes."),
    debug_map_url               STRING   OPTIONS(description="Opens a Google Map link so we can see where the customer is on a map.")
)
CLUSTER BY customer_id, event_timestamp_millis;

In [None]:
%%bigquery

#DROP TABLE `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location_results`;

CREATE TABLE IF NOT EXISTS `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location_results`
(
    customer_geo_location_id             STRING    OPTIONS(description="Primary key."),
    customer_id                          INT64     OPTIONS(description="Foreign Key: Customer"),
    genai_message                        STRING    OPTIONS(description="Gemini sample marketing message."),
    current_latitude                     FLOAT64   OPTIONS(description="The current latitude of the customer location."),
    current_longitude                    FLOAT64   OPTIONS(description="The current longitude of the customer location."),
    prior_distance_to_store_kilometers   FLOAT64   OPTIONS(description="The prior distance the customers is to a store."),
    current_distance_to_store_kilometers FLOAT64   OPTIONS(description="The current distance the customers is to a store."),
    store_id                             INT64     OPTIONS(description="The store id we are measuring distance from."),
    store_name                           STRING    OPTIONS(description="The store name we are measuring distance from."),
    debug_map_url                        STRING    OPTIONS(description="Opens a Google Map link so we can see where the customer is on a map."),
    event_timestamp_millis               INT64     OPTIONS(description="Unix Epoch timestamp on the customers device."),
    geo_boundry_entry_timestamp          TIMESTAMP OPTIONS(description="The timestamp the data was procssed by the continuous query."),
)
CLUSTER BY customer_id, event_timestamp_millis;

### <font color='#4285f4'>Create Avro Schema for Kafka Messages</font>


Used to parse data in BigQuery to seperate fields.  The schema must match your BigQuery Table and will be used by the DataFlow job to seperate the fields into columns within the table.

NOTE: This is currently note used until schema registry is in Kafka.

In [None]:
avro_schema = {
    'namespace': 'com.databeans.customer_geo_location',
    'type': 'record',
    'name': 'customer_geo_location',
    'fields': [
        {'name': 'customer_geo_location_id', 'type': 'string'},
        {'name': 'customer_id', 'type': 'int'},
        {'name': 'event_timestamp_millis', 'type': 'long'},
        {'name': 'prior_latitude', 'type': 'double'},
        {'name': 'prior_longitude', 'type': 'double'},
        {'name': 'current_latitude', 'type': 'double'},
        {'name': 'current_longitude', 'type': 'double'},
        {'name': 'debug_destination_latitude', 'type': 'double'},
        {'name': 'debug_destination_longitude', 'type': 'double'},
        {'name': 'debug_walking_speed_mps', 'type': 'double'},
        {'name': 'debug_map_url', 'type': 'string'}
    ]
}

with open('customer_geo_location.avsc', 'w') as out:
    json.dump(avro_schema, out, indent=4)

In [None]:
# Save to storage so dataflow job can see it
!gsutil cp customer_geo_location.avsc gs://${chocolate_ai_bucket}/customer_geo_location.avsc

### <font color='#4285f4'>DataFlow - Stream Data from Apache Kafka for BigQuery to a BigQuery Table (streaming ingestion)</font>


- <font color='red'>**WARNING:**</font> This will create a new job everytime this is run.  The notebook will only stop the lastest job, so please check the DataFlow UI to Cancel any additional jobs.

#### createDataflowJobApacheKafkaToBigQuery
Creates the DataFlow Job (verifies the job name, must be a new name for each job)

In [None]:
def createDataflowJobApacheKafkaToBigQuery(jobName):
  """Creates a DataFlow job to copy data from Apache Kafka for BiqQuery to stream data into a BigQuery Table"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}"

  # Gather existing job
  json_result = restAPIHelper(url, "GET", None)
  print(f"createDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']}")
      if item["name"] == jobName:
        print(f"DataFlow job already exists with date of {item['currentState']}.  Try a new name.")
        return None

  # Create DataFlow Job
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch
  print("Creating DataFlow Job from Flex Template")

  url = f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/flexTemplates:launch"

  # Continuous Queries needs useStorageWriteApiAtLeastOnce = True
  # https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery#optional-parameters
  #numStorageWriteApiStreams : Specifies the number of write streams, this parameter must be set. Default is 0.
  #storageWriteApiTriggeringFrequencySec : Specifies the triggering frequency in seconds, this parameter must be set. Default is 5 seconds.
  #useStorageWriteApiAtLeastOnce : This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.

  request_body = {
    "launch_parameter": {
        "jobName": jobName,
        "containerSpecGcsPath": "gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex",
        "parameters": {
            "readBootstrapServerAndTopic": f"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics/{kafka_topic_name}",
            "persistKafkaKey": "false",
            "writeMode": "SINGLE_TABLE_NAME",
            "numStorageWriteApiStreams": "2",
            "useStorageWriteApiAtLeastOnce": "true",
            "storageWriteApiTriggeringFrequencySec": "5",
            "enableCommitOffsets": "false",
            "kafkaReadOffset": "latest",
            "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
            "messageFormat": "JSON",
            "useBigQueryDLQ": "false",
            "stagingLocation": f"gs://{dataflow_bucket}/staging",
            "autoscalingAlgorithm": "NONE",
            "serviceAccount": dataflow_service_account,
            "usePublicIps": "false",
            "labels": "{\"goog-dataflow-provided-template-type\":\"flex\",\"goog-dataflow-provided-template-name\":\"kafka_to_bigquery_flex\",\"goog-dataflow-provided-template-version\":\"2024-07-16-00_rc00\"}",
            "outputTableSpec": f"{project_id}:{bigquery_dataset_name}.{bigquery_streaming_destination_table}"
        },
        "environment": {
            "numWorkers": 2,
            "tempLocation": f"gs://{dataflow_bucket}/tmp",
            "subnetwork": f"regions/{region}/subnetworks/{dataflow_subnet}",
            "enableStreamingEngine": True,
            "additionalExperiments": [
                "enable_streaming_engine"
            ],
            "additionalUserLabels": {}
        }
    }
}

  json_result = restAPIHelper(url, "POST", request_body)

  job = json_result["job"]
  print("Apache Kafka for BigQuery created: ", job)
  return job

#### stopDataflowJobApacheKafkaToBigQuery
Stops a DataFlow job.  Looks up the ID based upon the name.

In [None]:
def stopDataflowJobApacheKafkaToBigQuery(jobName):
  """Stops a DataFlow job to copy data from Apache Kafka for BiqQuery to stream data into a BigQuery Table"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}"

  # Gather existing jobs
  json_result = restAPIHelper(url, "GET", None)
  print(f"stopDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")
  found = False

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']} - {item['currentState']}")
      if item["name"] == jobName and item["currentState"] == "JOB_STATE_RUNNING":
        jobId = item["id"]
        found = True
        break

  if not found:
    print("DataFlow not found or is not running.")
    return

  # Stop DataFlow Job
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/update
  print("Stopping DataFlow Job ")

  url=f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/jobs/{jobId}"
  print(url)

  request_body = { "requestedState" : "JOB_STATE_CANCELLED" }

  json_result = restAPIHelper(url, "PUT", request_body)

  #job = json_result["job"]
  print("DataFlow Job Stopped")
  return

#### waitForDataFlowJobToStart
Waits for a job to start

In [None]:
def waitForDataFlowJobToStart(jobName):
  """Waits for job to turn to running"""

  # First find the job if it exists
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list

  url = f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}"

  # Gather existing jobs
  json_result = restAPIHelper(url, "GET", None)
  print(f"stopDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}")
  found = False

  # Test to see if job exists, if so return
  if "jobs" in json_result:
    for item in json_result["jobs"]:
      print(f"DataFlow Job Name: {item['name']} - {item['currentState']}")
      if item["name"] == jobName:
        jobId = item["id"]
        found = True
        break

  if not found:
    print("DataFlow not found or is not running.")
    return

  # Gets the job status
  # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get
  print("Getting DataFlow Job ")
  url=f"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/jobs/{jobId}"
  print(url)

  max_retries = 100
  attempt = 0

  while True:
    # Get Job
    json_result = restAPIHelper(url, "GET", None)

    # Test to see if connection exists, if so return
    if "currentState" in json_result:
      print(f"waitForDataFlowJobToStart (GET) currentState: {json_result['currentState']}")
      if json_result["currentState"] == "JOB_STATE_RUNNING":
        print("DataFlow Job is now running")
        return None
    else:
      print(f"waitForDataFlowJobToStart (GET) json_result: {json_result}")


    # Wait for 10 seconds
    attempt += 1
    if attempt > max_retries:
      raise RuntimeError("DataFlow Job not created")
    time.sleep(30)

#### Run the DataFlow Job

In [None]:
# The job can take a few minutes to start.  Click the link to see the progress:
# https://console.cloud.google.com/dataflow/jobs

jobName= f"kafka-stream-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
createDataflowJobApacheKafkaToBigQuery(jobName)

In [None]:
print(f"https://console.cloud.google.com/dataflow/jobs?project={project_id}")

In [None]:
waitForDataFlowJobToStart(jobName)

### <font color='#4285f4'>Token Provider / Confluent Provider</font>


Use logged in users credentials (versus a service account)

*   https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer
*   https://github.com/googleapis/managedkafka
* https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration




In [None]:
import google.auth
import google.auth.transport.urllib3
import urllib3
import json
import base64
import datetime
import time
# *** Add this import ***
from kafka.sasl.oauth import AbstractTokenProvider

# Modify the class definition to inherit from AbstractTokenProvider
class TokenProvider(AbstractTokenProvider): # <--- CHANGE HERE

  def __init__(self, **config):
    # Keep your existing init logic
    self.credentials, _project = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform']) # Ensure appropriate scope
    self.http_client = urllib3.PoolManager()
    self.HEADER = json.dumps(dict(typ='JWT', alg='GOOG_OAUTH2_TOKEN'))

  def valid_credentials(self):
    if not self.credentials.valid:
        # Use the correct Request object for urllib3
        self.credentials.refresh(google.auth.transport.urllib3.Request(self.http_client))
    return self.credentials

  def get_jwt(self, creds):
    # print(creds.expiry.timestamp())
    # Ensure creds.service_account_email exists if using ADC from a service account
    # If using user ADC, this might need adjustment or might not be required in the JWT 'sub'
    subject = getattr(creds, 'service_account_email', 'user_adc') # Use a placeholder if email isn't available
    return json.dumps(
        dict(
            exp=creds.expiry.timestamp(),
            iss='Google', # Or specific issuer if required
            iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
            # 'scope' in JWT might be redundant if token itself has scope, verify if needed
            # scope='kafka',
            sub=subject,
        )
    )

  def b64_encode(self, source):
    return (
        base64.urlsafe_b64encode(source.encode('utf-8'))
        .decode('utf-8')
        .rstrip('=')
    )

  def get_kafka_access_token(self, creds):
    # This constructs the token string specific to Google Managed Kafka OAUTHBEARER
    # It seems to combine metadata (header, jwt) with the actual Google token
    return '.'.join([
      self.b64_encode(self.HEADER),
      self.b64_encode(self.get_jwt(creds)),
      # Use the actual Google OAuth token obtained via ADC
      # We don't need to base64 encode creds.token here, it's already a token string.
      # However, the structure you have implies the THIRD part should be base64 encoded.
      # Double check Google's documentation for the exact format. Assuming your original logic is correct:
      self.b64_encode(creds.token)
      # If the third part should just be the token:
      # creds.token
    ])

  # This method now fulfills the AbstractTokenProvider requirement for kafka-python
  def token(self):
    try:
      # print("TokenProvider.token() called")
      creds = self.valid_credentials()
      return self.get_kafka_access_token(creds)
    except Exception as e:
      print(f"Error in TokenProvider.token: {e}")
      # Consider logging the traceback for detailed debugging
      # import traceback
      # traceback.print_exc()
      raise # Re-raise the exception

  # This method provides the format needed for confluent-kafka-python's callback
  def confluent_token(self):
    try:
      # print("TokenProvider.confluent_token() called")
      creds = self.valid_credentials()
      token_str = self.get_kafka_access_token(creds)

      # Calculate expiry timestamp in milliseconds since epoch
      # creds.expiry is already timezone-aware (usually UTC) from google-auth
      expiry_timestamp_ms = int(creds.expiry.timestamp() * 1000)

      return token_str, expiry_timestamp_ms # <--- Return token and expiry in MS
    except Exception as e:
      print(f"Error in TokenProvider.confluent_token: {e}")
      # import traceback
      # traceback.print_exc()
      raise # Re-raise the exception


In [None]:
# Confluent does not use a TokenProvider, it calls a method
def ConfluentTokenProvider(args, config):
  """Method to get the Confluent Token"""
  t = TokenProvider()
  return t.confluent_token()


# Print any Confluent errors (for debugging)
def ConfluentErrorProvider(e):
  print(e)
  raise Exception(e)

In [None]:
# For Debugging - WARNING: Never save these in your output of the notebook!
# t = TokenProvider()
# t.token()

# ConfluentTokenProvider(None, None)

#### Helper Methods (Fake Data and Callback)

In [None]:
# This can be used to do a callback when you publish a message
# Since we might publish a lot of messages, this is commented out in the Producer code

def delivery_callback(err, msg):
    if err:
        print('ERROR: Message failed delivery: {}'.format(err))
    else:
        print("Produced event to topic {topic}: value = {value:12}".format(topic=msg.topic(),  value=msg.value().decode('utf-8')))

### <font color='#4285f4'>Latitude / Longitude Helper Methods</font>

##### haversine_distance

In [None]:
import math

def haversine_distance(lat1, lon1, lat2, lon2):
  """
  Calculates the haversine distance between two points on a sphere.

  Args:
    lat1: Latitude of the first point in radians.
    lon1: Longitude of the first point in radians.
    lat2: Latitude of the second point in radians.
    lon2: Longitude of the second point in radians.

  Returns:
    The distance between the two points in kilometers.
  """

  # Earth's radius in kilometers
  R = 6371

  # Convert degrees to radians
  lat1 = math.radians(lat1)
  lon1 = math.radians(lon1)
  lat2 = math.radians(lat2)
  lon2 = math.radians(lon2)

  # Haversine formula
  dlat = lat2 - lat1
  dlon = lon2 - lon1
  a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
  c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
  distance = R * c

  return distance

In [None]:
# Create 10 people who are in london
# Have the walk towards the location location_1_lat_min, london_lat_max = 51.5174328, -0.1219854
# When they are within 1 km send them an alery
# Only send them the alert once

# have people walk at different speeds
# The average walking speed for most adults is around 4.8 kilometers per hour

##### bounding_box

In [None]:
import math

def bounding_box(latitude, longitude, distance_km):
  """
  Calculates the bounding box coordinates for a given latitude, longitude, and distance.

  Args:
    latitude: Latitude of the center point in decimal degrees.
    longitude: Longitude of the center point in decimal degrees.
    distance_km: Distance in kilometers for the bounding box.

  Returns:
    A tuple containing the minimum and maximum latitude and longitude values
    (min_lat, max_lat, min_lon, max_lon).
  """

  # Earth's radius in kilometers
  earth_radius_km = 6371

  # Convert latitude and longitude to radians
  lat_rad = math.radians(latitude)
  lon_rad = math.radians(longitude)

  # Calculate angular radius
  angular_radius = distance_km / earth_radius_km

  # Calculate bounding box coordinates
  min_lat = math.degrees(lat_rad - angular_radius)
  max_lat = math.degrees(lat_rad + angular_radius)

  # Handle potential issues with longitude calculations near the poles
  if abs(lat_rad) > math.pi / 2 - angular_radius:
    # Adjust longitude range to cover the entire circle
    min_lon = -180
    max_lon = 180
  else:
    min_lon = math.degrees(lon_rad - angular_radius / math.cos(lat_rad))
    max_lon = math.degrees(lon_rad + angular_radius / math.cos(lat_rad))

  return min_lat, max_lat, min_lon, max_lon

### <font color='#4285f4'>Kafka (Open Source and Confluent) Producers</font>

##### simulate_walk_open_source_kafka_producer (Open Source Kafka Producer)

In [None]:
import time
from geopy.distance import geodesic
import urllib.parse
import uuid

def simulate_walk_open_source_kafka_producer(customer_id, customer_name, starting_latitude, starting_longitude, \
                  ending_latitude, ending_longitude, speed_meters_per_second=1.4, debug_messages = False):
    """
    Simulates a walk from a starting point to an ending point.

    Args:
        customer_name (str): Name of the person walking.
        starting_latitude (float): Starting latitude in degrees.
        starting_longitude (float): Starting longitude in degrees.
        ending_latitude (float): Ending latitude in degrees.
        ending_longitude (float): Ending longitude in degrees.
        speed_meters_per_second (float, optional): Walking speed in meters per second. Defaults to 1.4.

    Prints information about the walk at regular intervals.
    """
    from kafka import KafkaProducer
    # Kafka Producer configuration with OAUTHBEARER authentication
    config = {
        'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
        'security_protocol': 'SASL_SSL',
        'sasl_mechanism': 'OAUTHBEARER',
        'sasl_oauth_token_provider': TokenProvider(),
        'reconnect_backoff_ms': 500,
        'reconnect_backoff_max_ms': 10000   
    }


    producer = KafkaProducer(**config)  # Use keyword unpacking for clear configuration

    # Calculate total distance and time
    start = (starting_longitude, starting_latitude)  # Swap order for consistency with haversine_distance
    end = (ending_longitude, ending_latitude)  # Swap order for consistency with haversine_distance
    total_distance = geodesic(start, end).meters if geodesic else haversine_distance(*start, *end)
    total_time = total_distance / speed_meters_per_second

    # Generate data points
    generate_data_time = 1  # seconds
    num_points = int(total_time / generate_data_time) + 1  # Include start and end points

    prior_latitude = starting_latitude
    prior_longitude = starting_longitude

    for i in range(num_points):
        fraction = i / (num_points - 1)  # Normalize fraction for even distribution
        lat = starting_latitude + fraction * (ending_latitude - starting_latitude)
        lon = starting_longitude + fraction * (ending_longitude - starting_longitude)
        distance_to_destination = haversine_distance(lat, lon, ending_latitude, ending_longitude)
        map_url = f"https://www.google.com/maps/place/{lat},{lon}/@{lat},{lon},17z"

        # Kafka
        # Log
        # user, event_time, current_lat, current_long, starting_lat, starting_long, dest_lat, dest_long, walking_speed_meters_per_second, distance_to_destination, map_url
        message_data = {
            "customer_geo_location_id" : f"{uuid.uuid4()}",
            "customer_id": customer_id,
            "event_timestamp_millis": int(time.time() * 1000),
            "prior_latitude": prior_latitude,
            "prior_longitude": prior_longitude,
            "current_latitude": lat,
            "current_longitude": lon,
            "debug_destination_latitude": ending_latitude,
            "debug_destination_longitude": ending_longitude,
            "debug_walking_speed_mps": speed_meters_per_second,
            "debug_map_url" : f"{map_url}"
        }

        # Save for next interation
        prior_latitude = lat
        prior_longitude = lon

        # Serialize data to bytes
        serialized_data = json.dumps(message_data).encode('utf-8')

        # Define the key based on your needs (e.g., customer_id)
        key = str(customer_id).encode('utf-8')

        # Produce the message with key
        # producer.send(kafka_topic_name, key=key, value=serialized_data) # callback=delivery_callback
        max_retries = 5
        retry_delay = 1  # seconds

        for attempt in range(max_retries):
            try:
                producer.send(kafka_topic_name, key=key, value=serialized_data)
                break  # Exit the loop if successful
            except Exception as e:
                if attempt < max_retries - 1:
                    print(f"Failed to send message to Kafka: {e}. Retrying in {retry_delay} seconds.")
                    time.sleep(retry_delay)
                else:
                    raise  # Raise the exception if all retries fail

        if i == 1:
          print(f"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}")
          print(f"message_data: {message_data}")

        if i % 100 == 0:
          if debug_messages:
            print(f"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}")
            print(f"message_data: {message_data}")
          producer.flush()

        time.sleep(generate_data_time)

    producer.flush()

    print(f"{customer_id} - {customer_name} walk complete.")

##### simulate_walk_confluent_kafka_producer (Confluent Kafka Producer)

In [None]:
import time
from geopy.distance import geodesic
import urllib.parse
import uuid

def simulate_walk_confluent_kafka_producer(customer_id, customer_name, starting_latitude, starting_longitude, \
                  ending_latitude, ending_longitude, speed_meters_per_second=1.4, debug_messages = False):
    """
    Simulates a walk from a starting point to an ending point.

    Args:
        customer_name (str): Name of the person walking.
        starting_latitude (float): Starting latitude in degrees.
        starting_longitude (float): Starting longitude in degrees.
        ending_latitude (float): Ending latitude in degrees.
        ending_longitude (float): Ending longitude in degrees.
        speed_meters_per_second (float, optional): Walking speed in meters per second. Defaults to 1.4.

    Prints information about the walk at regular intervals.
    """
    import confluent_kafka
    import functools

    # Kafka Producer configuration with SASL_PLAIN authentication
    # This requires a service principal key (json file) which must be base64 encoded
    # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket
    # secret = !(cat sa.key.json | base64 -w 0)
    # secret = secret[0]
    #config = {
    #    'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
    #    'sasl.username':     f'kafka-sp@{project_id}.iam.gserviceaccount.com',
    #    'sasl.password':     secret,
    #    'security.protocol': 'SASL_SSL',
    #    'sasl.mechanisms':   'PLAIN',
    #    'acks':              'all'
    #}


    # Kafka Producer configuration with OAUTHBEARER authentication
    # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration
    # https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py
    config = {
      'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog', # No port needed usually
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'OAUTHBEARER',
      'oauth_cb': functools.partial(ConfluentTokenProvider, None), # <-- Uses the callback function
      'error_cb' : functools.partial(ConfluentErrorProvider),
      'acks': 'all'    
    }


    producer = confluent_kafka.Producer(config)

    # Calculate total distance and time
    start = (starting_longitude, starting_latitude)  # Swap order for consistency with haversine_distance
    end = (ending_longitude, ending_latitude)  # Swap order for consistency with haversine_distance
    total_distance = geodesic(start, end).meters if geodesic else haversine_distance(*start, *end)
    total_time = total_distance / speed_meters_per_second

    # Generate data points
    generate_data_time = 1  # seconds
    num_points = int(total_time / generate_data_time) + 1  # Include start and end points

    prior_latitude = starting_latitude
    prior_longitude = starting_longitude

    for i in range(num_points):
        fraction = i / (num_points - 1)  # Normalize fraction for even distribution
        lat = starting_latitude + fraction * (ending_latitude - starting_latitude)
        lon = starting_longitude + fraction * (ending_longitude - starting_longitude)
        distance_to_destination = haversine_distance(lat, lon, ending_latitude, ending_longitude)
        map_url = f"https://www.google.com/maps/place/{lat},{lon}/@{lat},{lon},17z"

        # Kafka
        # Log
        # user, event_time, current_lat, current_long, starting_lat, starting_long, dest_lat, dest_long, walking_speed_meters_per_second, distance_to_destination, map_url
        message_data = {
            "customer_geo_location_id" : f"{uuid.uuid4()}",
            "customer_id": customer_id,
            "event_timestamp_millis": int(time.time() * 1000),
            "prior_latitude": prior_latitude,
            "prior_longitude": prior_longitude,
            "current_latitude": lat,
            "current_longitude": lon,
            "debug_destination_latitude": ending_latitude,
            "debug_destination_longitude": ending_longitude,
            "debug_walking_speed_mps": speed_meters_per_second,
            "debug_map_url" : f"{map_url}"
        }

        # Save for next interation
        prior_latitude = lat
        prior_longitude = lon


        # Serialize data to bytes
        serialized_data = json.dumps(message_data).encode('utf-8')

        # Define the key based on your needs (e.g., customer_id)
        key = str(customer_id).encode('utf-8')

        # Produce the message with key
        producer.produce(kafka_topic_name, key=key, value=serialized_data) # callback=delivery_callback

        if i == 1:
          print(f"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}")
          print(f"message_data: {message_data}")

        if i % 100 == 0:
          if debug_messages:
            print(f"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}")
            print(f"message_data: {message_data}")
          producer.flush()


        time.sleep(generate_data_time)

    producer.flush()

    print(f"{customer_id} - {customer_name} walk complete.")

### <font color='#4285f4'>Create People Function</font>

In [None]:
import threading

def create_people(starting_customer_id, number_of_people, producer_method_name = simulate_walk_open_source_kafka_producer) -> None:
  """
  Calculates people who will walk towards various locations.  This will generate a thread for each person
  and simulate them walking.

  Args:
    number_of_people: The number of people to generate.
    producer_method_name: The method to use to simulate the walk.

  Returns:
    None
  """
  location_1_latitude, location_1_longitude = 48.852066767829761, 2.3464926959635504  # Rue Galande
  location_2_latitude, location_2_longitude = 48.850829751346133, 2.3245967340236109  # Le Bon Marché
  location_3_latitude, location_3_longitude = 48.867691580985458, 2.3376027993295176  # Square Louvois
  location_4_latitude, location_4_longitude = 48.871015939679289, 2.302960997513936   # Av. des Champs-Élysées

  # Create a box where we will generate random people starting locations.
  # This is 10 km from Hôtel des Invalides
  # 48.85744164370035, 2.3128668119381186
  bounding_box_min_lat, bounding_box_max_lat, bounding_box_min_lon, bounding_box_max_lon = bounding_box(48.85744164370035, 2.3128668119381186, 10)

  people = []
  for i in range(number_of_people):
    print(f"Generating Person: {i+starting_customer_id}")
    destination_latitude = 0
    destination_longitude = 0

    if random.random() < 0.10:
      if i % 4 == 0:
        destination_latitude = location_1_latitude
        destination_longitude = location_1_longitude
      elif i % 4 == 1:
        destination_latitude = location_2_latitude
        destination_longitude = location_2_longitude
      elif i % 4 == 2:
        destination_latitude = location_3_latitude
        destination_longitude = location_3_longitude
      else:
        destination_latitude = location_4_latitude
        destination_longitude = location_4_longitude
    else:
        destination_latitude = random.uniform(bounding_box_min_lat, bounding_box_max_lat)
        destination_longitude = random.uniform(bounding_box_min_lon, bounding_box_max_lon)

    person_dict = {
          "customer_id": i+starting_customer_id,
          "name": f"person {i+starting_customer_id}",
          "starting_latitude":  random.uniform(bounding_box_min_lat, bounding_box_max_lat),
          "starting_longitude": random.uniform(bounding_box_min_lon, bounding_box_max_lon),
          "destination_latitude": destination_latitude,
          "destination_longitude": destination_longitude,
          # The average walking speed of a person is approximately 1.4 meters per second.  We might have people on bikes or scooters (possibly cars in traffic)
          "walking_speed_meters_per_second": round(random.uniform(1, 3),2)
    }
    distance_to_destination = haversine_distance(person_dict["starting_latitude"], person_dict["starting_longitude"],\
                                                 person_dict["destination_latitude"], person_dict["destination_longitude"])
    #print(distance_to_destination)
    people.append(person_dict)

  threads = []
  for item in people:
    threads.append(threading.Thread(target=producer_method_name, args=(
        item["customer_id"], item["name"], \
        item["starting_latitude"], item["starting_longitude"], \
        item["destination_latitude"], item["destination_longitude"], \
        item["walking_speed_meters_per_second"], False)))

  current_thread_count = 0
  throttling = 200
  for thread in threads:
    if current_thread_count <= throttling:
      thread.start()
      time.sleep(.5) # give kafka some time to catch up
    else:
      thread.start()
      time.sleep(30) # start a new customer every 30 seconds (when running this for more than 200 customers)
    current_thread_count += 1

  # Wait for all the threads
  for thread in threads:
    thread.join()

### <font color='#4285f4'>Create Simulation and Test 4 People</font>

##### Clear the table so we can see our results (easily)

In [None]:
%%bigquery

TRUNCATE TABLE `chocolate_ai.customer_geo_location`;
TRUNCATE TABLE `chocolate_ai.customer_geo_location_results`;

##### Test 4 "people" records using the Open Source and Confluent Producers

In [None]:
# Create 4 people all starting a the same location.
# Have them each walk towards a store
# This takes about 2 minutes and 30 second to run

print()
print("-----------------------------------------------------------------------------")
print("Run the SQL in the next cell in a BigQuery query window while this is running")
print("-----------------------------------------------------------------------------")
print()

# All 4 people will start here
starting_latitude,   starting_longitude   = 48.85744164370035, 2.3128668119381186   # Hôtel des Invalides

# They will each walk towards one of our stores (from the stores table)
# The 4 people will be walking toward one of the stores (on purpose, for testing)
location_1_latitude, location_1_longitude = 48.852066767829761, 2.3464926959635504  # Rue Galande
location_2_latitude, location_2_longitude = 48.850829751346133, 2.3245967340236109  # Le Bon Marché
location_3_latitude, location_3_longitude = 48.867691580985458, 2.3376027993295176  # Square Louvois
location_4_latitude, location_4_longitude = 48.871015939679289, 2.302960997513936   # Av. des Champs-Élysées

# Test 4 people (walking fast so we can see the results fairly quick)
threads = []

print("Open Source Kafka Producer: Rue Galande")
threads.append(threading.Thread(target=simulate_walk_open_source_kafka_producer, args=(10001, "Person: Rue Galande", \
                starting_latitude, starting_longitude, \
                location_1_latitude, location_1_longitude, \
                25, True)))

print("Confluent Kafka Producer: Le Bon Marché")
threads.append(threading.Thread(target=simulate_walk_confluent_kafka_producer, args=(10002, "Person: Le Bon Marché", \
                starting_latitude, starting_longitude, \
                location_2_latitude, location_2_longitude, \
                30, True)))

print("Open Source Kafka Producer: Square Louvois")
threads.append(threading.Thread(target=simulate_walk_open_source_kafka_producer, args=(10003, "Person: Square Louvois", \
                starting_latitude, starting_longitude, \
                location_3_latitude, location_3_longitude, \
                35, True)))

print("Confluent Kafka Producer: 	Av. des Champs-Élysées")
threads.append(threading.Thread(target=simulate_walk_confluent_kafka_producer, args=(10004, "Person: Av. des Champs-Élysées", \
                starting_latitude, starting_longitude, \
                location_4_latitude, location_4_longitude, \
                40, True)))

for thread in threads:
  thread.start()

for thread in threads:
  thread.join()

In [None]:
%%bigquery

-- It might take a second to see the data

-- This will show each customer and their proximity to each store
WITH raw_data AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY event_timestamp_millis DESC) AS ranking
  FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`
)
, store_data AS (
  -- This is hardcoded for now until we can join
  SELECT 1 AS store_id,
         'Le Bon Marché' AS store_name,
         '24 Rue de Sèvres, 75007 Paris' AS store_address,
         48.850829751346133 AS store_latitude,
         2.3245967340236109 AS store_longitude
  UNION ALL
  SELECT 2 AS store_id,
         'Av. des Champs-Élysées' AS store_name,
         '75 Av. des Champs-Élysées, 75008 Paris' AS store_address,
         48.871015939679289 AS store_latitude,
         2.302960997513936 AS store_longitude
  UNION ALL
  SELECT 3 AS store_id,
         'Rue Galande' AS store_name,
         '77 Rue Galande, 75005 Paris' AS store_address,
         48.852066767829761 AS store_latitude,
         2.3464926959635504 AS store_longitude
  UNION ALL
  SELECT 4 AS store_id,
         'Square Louvois' AS store_name,
         '69 Rue de Richelieu, 75002 Paris' AS store_address,
         48.867691580985458 AS store_latitude,
         2.3376027993295176 AS store_longitude
)
, geo_data AS (
  SELECT *,
        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)
                   )  / 1000 AS current_distance_to_store_kilometers

  FROM raw_data
       CROSS JOIN store_data
)
, results AS (
-- This tests to see if they were outside the circle and then inside
-- We only want to send them one notification when they break the geofence
-- This uses the prior lat/long and compares with the current lat/long
-- Ideally we would say "NOT EXISTS" in our customer_geo_location_results table
  SELECT *,
         CASE WHEN prior_distance_to_store_meters > 1000 AND current_distance_to_store_meters <= 1000
              THEN TRUE
              ELSE FALSE
          END AS entered_geofence
  FROM geo_data
)
SELECT customer_geo_location_id,
       entered_geofence,
       customer_id,
       current_latitude,
       current_longitude,
       prior_distance_to_store_kilometers,
       current_distance_to_store_meters,
       store_id,
       store_name,
       debug_map_url,
       TO_JSON(STRUCT(CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS STRING) AS event_timestamp)) AS _ATTRIBUTES
  FROM results
 WHERE (ranking = 1 OR entered_geofence = TRUE)
 ORDER BY customer_id, store_id;

### <font color='#4285f4'>Start your BigQuery Continuous Queries (Manual Intervention Required)</font>

#### Create a reservation for the continuous query
- NOTE: You will need to wait several minutes for this to take effect

In [None]:
# Create a continuous query reservation and assignment

user_input = input("Do you want to start BigQuery Reservations? This will START billing and will continue until you remove these in the Clean Up code (Y/n)?")
if user_input == "Y":
  sql = f"""CREATE RESERVATION `{project_id}.region-{bigquery_location}.continuous-query-reservation`
            OPTIONS (edition = "enterprise",
                     slot_capacity = 50);
  """
  RunQuery(sql)

  sql = f"""CREATE ASSIGNMENT `{project_id}.region-{bigquery_location}.continuous-query-reservation.continuous-query-reservation-assignment`
            OPTIONS(assignee = "projects/{project_id}",
                    job_type = "CONTINUOUS");
  """
  RunQuery(sql)


#### Run each of the below queries


1. Copy the SQL to a BigQuery SQL Window
2. Under the More menu, select Continuous Query
3. Under the Query settings, under Continuous query, select kafka-continuous-query for the service account
4. Run the Query (it will take a minute to start)

##### Query 1 (Continuous Query to Pub/Sub)

```
----------------------------------------------------------------------------------------------------------------
-- Insert the customers who break the geo-boundry into Pub/Sub and then a process (e.g. Cloud Function) will send them an alert
----------------------------------------------------------------------------------------------------------------

-- Algorithm:
--  Get the data from the customer_geo_location table (this just gets the current row)
--  The stores are hardcoded, but you could query the store table.  This will be changed when continuous tables can join to tables.
--  Calculate the distence the customer is from the store
--    Calculate their prior distence using the prior lat/long (in the future we could use the LAG function)
--    Calculate their current distance using their current lat/long
--  Determine if the prior distance was greater 1 km and if thier current distance is less than (or equal) to 1 km.  If true, then they entered the geo-boundry.
--  Select the final results
--    Select when cross geo-bountry is true

EXPORT DATA OPTIONS(uri="https://pubsub.googleapis.com/projects/${project_id}/topics/bq-continuous-query", format="cloud_pubsub") AS
WITH raw_data AS (
  SELECT *
    FROM  APPENDS(TABLE `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`,
            -- Configure the APPENDS TVF start_timestamp to specify when you want to
            -- start processing data using your continuous query.
            -- Here we process data as ten minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 15 MINUTE)
)

, geo_data AS (
    -- The stores are hardcoded until we can perform joins within a continuous query
  SELECT *,
        1 AS store_id,
        'Le Bon Marché' AS store_name,
        '24 Rue de Sèvres, 75007 Paris' AS store_address,

        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)
                   )  / 1000 AS current_distance_to_store_kilometers
    FROM raw_data
   /*
   UNION ALL

   SELECT *,
          2 AS store_id,
         'Av. des Champs-Élysées' AS store_name,
         '75 Av. des Champs-Élysées, 75008 Paris' AS store_address,

        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.302960997513936, 48.871015939679289)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.302960997513936, 48.871015939679289)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.302960997513936, 48.871015939679289)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.302960997513936, 48.871015939679289)
                   )  / 1000 AS current_distance_to_store_kilometers

    FROM raw_data                   

   UNION ALL                   

   SELECT *,
          3 AS store_id,
         'Rue Galande' AS store_name,
         '77 Rue Galande, 75005 Paris' AS store_address,

        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)
                   )  / 1000 AS current_distance_to_store_kilometers
    FROM raw_data                   

   UNION ALL                   

   SELECT *,
          4 AS store_id,
         'Square Louvois' AS store_name,
         '69 Rue de Richelieu, 75002 Paris' AS store_address,

        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)
                   )  / 1000 AS current_distance_to_store_kilometers
  FROM raw_data
  */
)

, results AS (
-- This tests to see if they were ouside the circle and then inside
-- We only want to send them one notification when they break the geofence
-- This uses the prior lat/long and compares with the current lat/long
-- Ideally we would say "NOT EXISTS" in our customer_geo_location_results table
  SELECT *,
         CASE WHEN prior_distance_to_store_meters > 1000 AND current_distance_to_store_meters <= 1000
              THEN TRUE
              ELSE FALSE
          END AS entered_geofence
  FROM geo_data
)

, marketing_message AS (
  -- Call Gemini to generate a marketing message
  SELECT *
    FROM ML.GENERATE_TEXT(MODEL`${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model`,
          (SELECT customer_geo_location_id,
                             customer_id,
                             current_latitude,
                             current_longitude,
                             prior_distance_to_store_kilometers,
                             current_distance_to_store_kilometers,
                             store_id,
                             store_name,
                             debug_map_url,
                             entered_geofence,
                             event_timestamp_millis,
                             CONCAT("Create a marketing message for a user who has just entered a geofencing boundy for the company Chocolate AI.\n",
                                    "The customer is ", current_distance_to_store_kilometers, " kilometers away from our ", store_name, " store.\n",
                                    "Make the message catchy and convience them it is worth the walk.\n",
                                    "Tell them to use coupon code 'CLOSE-BY' to get 25% off and a free piece of chocolate."
                              ) AS prompt
            FROM results),
          STRUCT(.8 AS temperature, .8 AS top_p)
          )    
)

SELECT TO_JSON_STRING(STRUCT(customer_geo_location_id,
                             customer_id,
                             `${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model_result_as_string`(ml_generate_text_result) AS message,
                             current_latitude,
                             current_longitude,
                             prior_distance_to_store_kilometers,
                             current_distance_to_store_kilometers,
                             store_id,
                             store_name,
                             debug_map_url)) AS message,
      TO_JSON(STRUCT(CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS STRING) AS event_timestamp)) AS _ATTRIBUTES
  FROM marketing_message
  WHERE entered_geofence = true;
```

##### Query 2 (Continuous Query to BigQuery table)

```
----------------------------------------------------------------------------------------------------------------
-- Insert the customers who break the geo-boundry into another BigQuery table
----------------------------------------------------------------------------------------------------------------

-- Algorithm:
--  Get the data from the customer_geo_location table (this just gets the current row)
--  The stores are hardcoded, but you could query the store table.  This will be changed when continuous tables can join to tables.
--  Calculate the distence the customer is from the store
--    Calculate their prior distence using the prior lat/long (in the future we could use the LAG function)
--    Calculate their current distance using their current lat/long
--  Determine if the prior distance was greater 1 km and if thier current distance is less than (or equal) to 1 km.  If true, then they entered the geo-boundry.
--  Insert the final results into the customer_geo_location_results table where cross geo-bountry is true

INSERT INTO `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location_results`
   (customer_geo_location_id,
    customer_id,
    genai_message,
    current_latitude,
    current_longitude,
    prior_distance_to_store_kilometers,
    current_distance_to_store_kilometers,
    store_id,
    store_name,
    debug_map_url,
    event_timestamp_millis,
    geo_boundry_entry_timestamp)
WITH raw_data AS (
  SELECT *
    FROM  APPENDS(TABLE `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`,
            -- Configure the APPENDS TVF start_timestamp to specify when you want to
            -- start processing data using your continuous query.
            -- Here we process data as ten minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 15 MINUTE)
)

, geo_data AS (
    -- The stores are hardcoded until we can perform joins within a continuous query
   SELECT *,
          1 AS store_id,
         'Rue Galande' AS store_name,
         '77 Rue Galande, 75005 Paris' AS store_address,

        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)
                   )  / 1000 AS current_distance_to_store_kilometers
    FROM raw_data

   /*

   UNION ALL                   

  SELECT *,
        2 AS store_id,
        'Le Bon Marché' AS store_name,
        '24 Rue de Sèvres, 75007 Paris' AS store_address,

        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)
                   )  / 1000 AS current_distance_to_store_kilometers
    FROM raw_data                  

   UNION ALL                   

   SELECT *,
          3 AS store_id,
         'Square Louvois' AS store_name,
         '69 Rue de Richelieu, 75002 Paris' AS store_address,

        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)
                   )  / 1000 AS current_distance_to_store_kilometers
    FROM raw_data

   UNION ALL
   
   SELECT *,
          4 AS store_id,
         'Av. des Champs-Élysées' AS store_name,
         '75 Av. des Champs-Élysées, 75008 Paris' AS store_address,

        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.302960997513936, 48.871015939679289)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(2.302960997513936, 48.871015939679289)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.302960997513936, 48.871015939679289)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(2.302960997513936, 48.871015939679289)
                   )  / 1000 AS current_distance_to_store_kilometers

    FROM raw_data                   

  */
)

, results AS (
-- This tests to see if they were ouside the circle and then inside
-- We only want to send them one notification when they break the geofence
-- This uses the prior lat/long and compares with the current lat/long
-- Ideally we would say "NOT EXISTS" in our customer_geo_location_results table
  SELECT *,
         CASE WHEN prior_distance_to_store_meters > 1000 AND current_distance_to_store_meters <= 1000
              THEN TRUE
              ELSE FALSE
          END AS entered_geofence
  FROM geo_data
)

, marketing_message AS (
  -- Call Gemini to generate a marketing message
  SELECT *
    FROM ML.GENERATE_TEXT(MODEL`${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model`,
          (SELECT customer_geo_location_id,
                             customer_id,
                             current_latitude,
                             current_longitude,
                             prior_distance_to_store_kilometers,
                             current_distance_to_store_kilometers,
                             store_id,
                             store_name,
                             debug_map_url,
                             entered_geofence,
                             event_timestamp_millis,
                             CONCAT("Create a marketing message for a user who has just entered a geofencing boundy for the company Chocolate AI.\n",
                                    "The customer is ", current_distance_to_store_kilometers, " kilometers away from our ", store_name, " store.\n",
                                    "Make the message catchy and convience them it is worth the walk.\n",
                                    "Tell them to use coupon code 'CLOSE-BY' to get 25% off and a free piece of chocolate."
                              ) AS prompt
            FROM results),
          STRUCT(.8 AS temperature, .8 AS top_p)
          )    
)

SELECT customer_geo_location_id,
       customer_id,
       `${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model_result_as_string`(ml_generate_text_result) AS genai_message,
       current_latitude,
       current_longitude,
       prior_distance_to_store_kilometers,
       current_distance_to_store_kilometers,
       store_id,
       store_name,
       debug_map_url,
       event_timestamp_millis,
       CURRENT_TIMESTAMP() AS geo_boundry_entry_timestamp
  FROM marketing_message
 WHERE entered_geofence = true;
```

### <font color='#4285f4'>Start a thread and create many people walking</font> (this will generate lots of data)


In [None]:
# This will start 100 threads in this notebook (which is not ideal to run threads in a notebook, but this is for demo purposes)
# The threads might overwhelm our Apache Kafka cluster based upon the number of people you simulate.
# You would need to increase the Apache Kafka cluster size to handle lots of messages.  Also, create a large notebook runtime in Colab Enterprise to handle more threads.
# If you stop this cell, the threads will continue to run.  The output will show in other cells if you print within the thread.

# While this cell is running, execute the queries in the several cells to monitor the progress of the jobs
# This cell might output errors if it gets overwhelmed with all the threads running.  In real life these would all be seperate clients.

number_of_customers = 100

user_input = input(f"Do you want to simulate {number_of_customers} people waling using the Kafka Producer (Y/n)?")
if user_input == "Y":
  # Create some people and send using Open Source Producer
  starting_customer_id = GetMaxNextValue("chocolate_ai.customer_geo_location", "customer_id")
  create_people(starting_customer_id, number_of_customers, simulate_walk_open_source_kafka_producer)

# Commented out (so you do not run all the threads for both providers at the same time, you will get conflicts of customer ids)
# user_input = input(f"Do you want to simulate {number_of_customers} people waling using the Confluent Producer (Y/n)?")
# if user_input == "Y":
#  # Create some people and send using Confluent Kafka Producer
#  starting_customer_id = GetMaxNextValue("chocolate_ai.customer_geo_location", "customer_id")
#  create_people(starting_customer_id, number_of_customers, simulate_walk_confluent_kafka_producer)

# NOTES:
# 1. If you are running this for a long time, you should right click this cell and clear the output.
# 2. The people are walking straght towards the destination, they will walk over water and through buildings.
#    Google Maps routing could be called for a realistic walking route.

#### Copy these to BigQuery and run outside the notebook while the "people" threads are running

In [None]:
%%bigquery

-- Show the most recent records inserted into our BigQuery table from Apache Kafka
SELECT * FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location` ORDER BY event_timestamp_millis LIMIT 100;

In [None]:
%%bigquery

-- The number of events per second (add day if you are running accross days)
SELECT CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS DATE)        AS Date,
       EXTRACT(HOUR   FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Hour,
       EXTRACT(MINUTE FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Minute,
       EXTRACT(SECOND FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Second,
       COUNT(*) AS Cnt
  FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`
GROUP BY ALL
ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC
LIMIT 100;

In [None]:
%%bigquery

-- This will show each customer and their proximity to each store

-- Algorithm:
--  Get the data from the customer_geo_location table for the current date
--  Gather our stores so we can compare each customer to the distance to each
--  Calculate the distance the customer is from the store
--    Calculate their prior distance using the prior lat/long (in the future we could use the LAG function)
--    Calculate their current distance using their current lat/long
--  Determine if the prior distance was greater 1 km and if their current distance is less than (or equal) to 1 km.  If true, then they entered the geo-boundry.
--  Select the final results
--    Select when cross geo-bountry is true
--    Select the current distance from each store (we will get 4 records for each customer and 1 for when the cross the boundry)

WITH raw_data AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY event_timestamp_millis DESC) AS ranking
    FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`
   WHERE CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS DATE) = CURRENT_DATE()
)
, store_data AS (
  SELECT * FROM `chocolate_ai.store`
)
, geo_data AS (
  SELECT *,
        -- Using the prior lat/long see if they were outside the geofence
        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)
                   ) AS prior_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),
                    ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)
                   ) / 1000 AS prior_distance_to_store_kilometers,

        -- Using the current lat/long see if they are inside the geofence
        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)
                   ) AS current_distance_to_store_meters,

        ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),
                    ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)
                   )  / 1000 AS current_distance_to_store_kilometers

  FROM raw_data
       CROSS JOIN store_data
)
, results AS (
-- This tests to see if they were outside the circle and then inside
-- We only want to send them one notification when they break the geofence
-- This uses the prior lat/long and compares with the current lat/long
-- Ideally we would say "NOT EXISTS" in our customer_geo_location_results table
  SELECT *,
         CASE WHEN prior_distance_to_store_meters > 1000 AND current_distance_to_store_meters <= 1000
              THEN TRUE
              ELSE FALSE
          END AS entered_geofence
  FROM geo_data
)
SELECT customer_geo_location_id,
       entered_geofence,
       customer_id,
       current_latitude,
       current_longitude,
       prior_distance_to_store_kilometers,
       current_distance_to_store_kilometers,
       store_id,
       store_name,
       debug_map_url,
       TO_JSON(STRUCT(CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS STRING) AS event_timestamp)) AS _ATTRIBUTES
  FROM results
 WHERE (ranking = 1 OR entered_geofence = TRUE)
 ORDER BY customer_id, store_id;

In [None]:
%%bigquery

-- See the processed records by the continuous query (we insert into a BigQuery table: customer_geo_location_results)
SELECT customer_geo_location_results.*,
  FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location_results` AS customer_geo_location_results
 ORDER BY geo_boundry_entry_timestamp DESC;


In [None]:
# See the records in Pub/Sub
# Create a new subscription to see the records
print(f"https://console.cloud.google.com/cloudpubsub/subscription/list?project={project_id}")

In [None]:
%%bigquery

-- Want to know how many slots the continuous query is using?

-- Get the number or slots the continuous query is using (good for estimating your workload)
-- Review the field: job_avg_slots which will be about 1 to 3

-- USER ACTION TO DO: Change the below job_id
-- 1. Go to your Continuous query window
-- 2. Click on "Job Information" tab in the bottom panel
-- 3. Copy the last part of the "Job ID"
SELECT job.creation_time,
      job.project_id,
      job.project_number,
      job.user_email,
      job.job_id,
      job.job_type,
      job.statement_type,
      job.priority,
      job.start_time,
      job.end_time,
      job.query,
      job.state,
      job.reservation_id,
      job.total_bytes_processed,
      job.total_slot_ms,
      job.error_result.reason     AS error_result_reason,
      job.error_result.location   AS error_result_location,
      job.error_result.debug_info AS error_result_debug_info,
      job.error_result.message    AS error_result_message,

      -- Average slot utilization per job is calculated by dividing
      -- total_slot_ms by the millisecond duration of the job
      CAST(SAFE_DIVIDE(job.total_slot_ms,(TIMESTAMP_DIFF(IFNULL(job.end_time,CURRENT_TIMESTAMP()), job.start_time, MILLISECOND))) AS FLOAT64) AS job_avg_slots

  FROM `${project_id}`.`region-us`.INFORMATION_SCHEMA.JOBS AS job
        CROSS JOIN UNNEST(job.job_stages) as unnest_job_stages
        CROSS JOIN UNNEST(job.timeline) AS unnest_timeline
  WHERE job.job_id = 'bquxjob_544ac149_191ec5b33ed'
GROUP BY ALL;

### <font color='#4285f4'>Kafka (Open Source and Confluent) Consumers</font>

This sample code on how to consume the messaging using the open source and Confluent Python libraries.  It shows you auth and how to read the messages.

#### <font color='#4285f4'>Open Source Kafka Consumer</font>

In [None]:
def openSourceKafkaConsumer():
  from kafka import KafkaConsumer

  # Kafka Consumer configuration with SASL_PLAIN authentication
  # This requires a service principal key (json file) which must be base64 encoded
  # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket
  # secret = !(cat sa.key.json | base64 -w 0)
  # secret = secret[0]
  #config = {
  #    'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
  #    'security_protocol': 'SASL_SSL',  # Use SASL_PLAINTEXT for username/password
  #    'sasl_mechanism': 'PLAIN',
  #    'sasl_plain_username': f'kafka-sp@{project_id}.iam.gserviceaccount.com',
  #    'sasl_plain_password': secret,
  #    'group_id':          'kafka-group-id',
  #    'auto_offset_reset': 'earliest'
  #}

  # Kafka Consumer configuration with OAUTHBEARER authentication
  config = {
      'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
      'security_protocol': 'SASL_SSL',
      'sasl_mechanism': 'OAUTHBEARER',
      'sasl_oauth_token_provider': TokenProvider(),
      'group_id': 'kafka-group-id',
      'auto_offset_reset': 'earliest'
  }

  # Create Consumer instance
  consumer = KafkaConsumer(**config)  # Use keyword unpacking for clear configuration

  # Subscribe to topic
  consumer.subscribe([kafka_topic_name])

  i = 0
  max_items = 50

  # Poll for new messages from Kafka and print them.
  try:
      while True:
          messages = consumer.poll(1.0)
          for partition, messages in messages.items():
              for message in messages:
                  i += 1
                  if i >= max_items:
                      print(f"Reached max items ({max_items})")
                      break
                  try:
                      print(f"Consumed record with key {message.key} and value {message.value}")
                      # Process the message here (e.g., parse JSON, store data)
                      message_data = json.loads(message.value)
                      print(message_data)
                  except Exception as e:
                      print(f"Error processing message: {e}")
              if i >= max_items:
                  break
          if i >= max_items:
              break

  except KeyboardInterrupt:
      pass
  finally:
      # Leave group and commit final offsets
      consumer.close()


In [None]:
openSourceKafkaConsumer()

#### <font color='#4285f4'>Confluent Source Kafka Consumer</font>

In [None]:
def confluentKafkaConsumer():
  from confluent_kafka import Consumer
  import functools

  # Kafka Consumer configuration with SASL_PLAIN authentication
  # This requires a service principal key (json file) which must be base64 encoded
  # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket
  # secret = !(cat sa.key.json | base64 -w 0)
  # secret = secret[0]
  #config = {
  #    # User-specific properties that you must set
  #    'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
  #    'sasl.username':     f'kafka-sp@{project_id}.iam.gserviceaccount.com',
  #    'sasl.password':     secret,#
  #    'security.protocol': 'SASL_SSL',
  #    'sasl.mechanisms':   'PLAIN',
  #    'group.id':          'kafka-group-id',
  #    'auto.offset.reset': 'earliest'
  #}

  # Kafka Consumer configuration with OAUTHBEARER authentication
  config = {
      'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'OAUTHBEARER',
      'oauth_cb': functools.partial(ConfluentTokenProvider, None),
      'error_cb' : functools.partial(ConfluentErrorProvider),
      'group.id': 'kafka-group-id',
      'auto.offset.reset': 'earliest'
  }

  # Create Consumer instance
  consumer = Consumer(config)

  # Subscribe to topic
  consumer.subscribe([kafka_topic_name])

  i = 0
  max_items = 50

  # Poll for new messages from Kafka and print them.
  try:
      while True:
          msg = consumer.poll(1.0)
          if msg is None:
              # Initial message consumption may take up to
              # `session.timeout.ms` for the consumer group to
              # rebalance and start consuming
              print("Waiting...")
          elif msg.error():
              print("ERROR: %s".format(msg.error()))
          else:
              # Extract the (optional) key and value, and print.
              i += 1
              print(msg.value().decode('utf-8'))
              if i >= max_items:
                  print(f"Reached max items ({max_items})")
                  break
  except KeyboardInterrupt:
      pass
  finally:
      # Leave group and commit final offsets
      consumer.close()

In [None]:
confluentKafkaConsumer()

### <font color='#4285f4'>Clean Up</font>

In [None]:
# Note if you do not know your job id, or overwrote the value, click here to open and manually Cancel the job
# https://console.cloud.google.com/dataflow/jobs

user_input = input(f"Do you want to delete your DataFlow Job {jobName} (Y/n)?")
if user_input == "Y":
  stopDataflowJobApacheKafkaToBigQuery(jobName)

In [None]:
user_input = input("Do you want to delete your Apache Kafka for BigQuery (Y/n)?")
if user_input == "Y":
  deleteApacheKafkaForBigQueryCluster()

In [None]:
user_input = input("Do you want to delete your BigQuery Reservations. This will STOP billing! (Y/n)?")
if user_input == "Y":
  sql = f"DROP ASSIGNMENT `{project_id}.region-{bigquery_location}.continuous-query-reservation.continuous-query-reservation-assignment`;"
  RunQuery(sql)
  sql = f"DROP RESERVATION `{project_id}.region-{bigquery_location}.continuous-query-reservation`;"
  RunQuery(sql)

### <font color='#4285f4'>Reference Links</font>

- [Python Kafka Libary](https://kafka-python.readthedocs.io/en/master/index.html)
- [Confluent Python Library](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html)
- [Google Apache Kafka for BigQuery - Authentication](https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer)
- [Google Apache Kafka for BigQuery - OAuth Sample Code](https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer)
* [Google Apache Kafka for BigQuery - Sample Code](https://github.com/googleapis/managedkafka)
- [Confluent - Sample OAuth Code](https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py)
* [Confluent - Kafka Config for Python](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration)