colab-enterprise/Campaign-Performance-Geofencing-Simulation.ipynb (3,049 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "2vvAx-5Hubji"
},
"source": [
"### <font color='#4285f4'>Overview</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KxiZrGf6ubjk"
},
"source": [
"This demonstration simulates a geofencing campaign around four Chocolate AI stores in Paris. We simulate 100 or more customers moving throughout the city. As these customers enter a designated 1 km radius around any of the stores, a continuous query triggers an alert to a Pub/Sub topic, representing a targeted special offer sent to their mobile devices. This showcases how real-time location data, processed through Kafka, Dataflow, and BigQuery, can drive targeted marketing campaigns.\n",
"\n",
"*We assume they are running our application and have location services turned on.*\n",
"\n",
"\n",
"Process Flow:\n",
"\n",
"1. **Infrastructure Setup:**\n",
" * Provision a Managed Service for Apache Kafka and create a Kafka Topic.\n",
" * Initiate a Dataflow pipeline to stream data from Kafka to a BigQuery table (`customer_geo_location`).\n",
" * Create a BigQuery reservation with sufficient capacity for continuous queries (50 slots minimum, each continuous query utilizes ~2 slots).\n",
"\n",
"2. **Data Generation (Simulation):**\n",
" * Generate 100 (or more) simulated people within the city of Paris.\n",
" * Each person is assigned a random starting and ending location within a predefined bounding box (e.g., subset of Paris).\n",
" * Each person moves at a different, randomly assigned walking rate.\n",
" * For each person, at every second:\n",
" * Publish each customer's prior latitude/longitude (for trajectory analysis), current latitude/longitude, along with a timestamp, to the Kafka topic.\n",
" * The demo includes some fields prefixed with \"debugging_\" to facilitate better understanding of the streaming data.\n",
"\n",
"3. **Data Ingestion and Processing:**\n",
" * Dataflow reads data from the Kafka topic and writes it to the BigQuery table (`customer_geo_location`).\n",
" * Implement two continuous queries on the `customer_geo_location` table:\n",
" * Query 1: Calculate if the customers breaks the 1 km geo boundary of any of the 4 stores and insert into a BigQuery table.\n",
" * Query 2: Calculate if the customers breaks the 1 km geo boundary of any of the 4 stores and publish to Pub/Sub.\n",
" * Call the Gemini API in realtime to generate marketing text based on the user's context.\n",
" * Two continuous queries are used to demonstrate both techniques in the demo.\n",
" \n",
"4. **OPTIONAL:**\n",
" * a. You can then read Pub/Sub and generate a push notification to the customer's cell phone (Chocolate AI App)\n",
"\n",
"Cost:\n",
"* Very High (~ $5 per hour): Kafka, Dataflow and BigQuery Reservation. \n",
" - You can run this for an hour or two at low cost, but delete the resources when done.\n",
"* Medium: Remember to stop your Colab Enterprise Notebook Runtime\n",
"\n",
"Author:\n",
"* Adam Paternostro"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "T8lq_wItubjk"
},
"outputs": [],
"source": [
"# Architecture Diagram\n",
"from IPython.display import Image\n",
"Image(url='https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Artifacts/Campaign-Performance-Geofencing-Simulation-Architecture.png', width=1200)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### <font color='#4285f4'>Video Walkthrough</font>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Performance-Geofencing-Simulation.mp4)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from IPython.display import HTML\n",
"\n",
"HTML(\"\"\"\n",
"<video width=\"800\" height=\"600\" controls>\n",
" <source src=\"https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Performance-Geofencing-Simulation.mp4\" type=\"video/mp4\">\n",
" Your browser does not support the video tag.\n",
"</video>\n",
"\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "HMsUvoF4BP7Y"
},
"source": [
"### <font color='#4285f4'>License</font>\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jQgQkbOvj55d"
},
"source": [
"```\n",
"# Copyright 2024 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"```\n",
"\n",
"Author: Adam Paternostro"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "m65vp54BUFRi"
},
"source": [
"### <font color='#4285f4'>Pip installs</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "5MaWM6H5i6rX"
},
"outputs": [],
"source": [
"# PIP Installs\n",
"import sys\n",
"\n",
"# https://kafka-python.readthedocs.io/en/master/index.html\n",
"!{sys.executable} -m pip install kafka-python\n",
"\n",
"# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html\n",
"!{sys.executable} -m pip install confluent-kafka"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "UmyL-Rg4Dr_f"
},
"source": [
"### <font color='#4285f4'>Initialize</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xOYsEVSXp6IP"
},
"outputs": [],
"source": [
"import json\n",
"import random\n",
"import time\n",
"import datetime\n",
"import base64\n",
"\n",
"import google.auth\n",
"import google.auth.transport.urllib3\n",
"import urllib3"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "wMlHl3bnkFPZ"
},
"outputs": [],
"source": [
"# Set these (run this cell to verify the output)\n",
"\n",
"# WARNING: Hardcoded for now\n",
"# For testing you need (These will be automated with Terraform)\n",
"# network (vpc-main), kafka-subnet, dataflow-subnet\n",
"# service principals: dataflow-service-account [Optional: kafka-service-principal for using a service principal]\n",
"# buckets: ${project_id} (for AVRO schema), dataflow-staging-us-central1-756740881369 (for dataflow temp files w/o soft delete on)\n",
"# BigQuery dataset: chocolate_ai [You need to create this]\n",
"# BigQuery table: customer_geo_location [This is created for you]\n",
"\n",
"bigquery_location = \"${bigquery_location}\"\n",
"region = \"${region}\"\n",
"kafka_cluster_name = \"chocolate-ai-kafka-cluster-01\"\n",
"kafka_topic_name = \"customer-location-topic-01\"\n",
"dataflow_bucket = \"${dataflow_staging_bucket}\" # should not have logical delete on\n",
"dataflow_service_account = \"${dataflow_service_account}\" # Needs Role: roles/managedkafka.client\n",
"bigquery_dataset_name = \"${bigquery_chocolate_ai_dataset}\"\n",
"bigquery_streaming_destination_table = \"customer_geo_location\"\n",
"kafka_subnet = \"kafka-subnet\"\n",
"dataflow_subnet = \"dataflow-subnet\"\n",
"\n",
"# kafka_service_principal_name = \"kafka-service-principal\" # No longer need since using logged in user\n",
"# kafka_service_principal_email = \"kafka-service-principal-email\" # No longer need since using logged in user\n",
"\n",
"# Get some values using gcloud\n",
"project_id = !(gcloud config get-value project)\n",
"user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n",
"\n",
"if len(project_id) == 0:\n",
" raise RuntimeError(f\"project_id is not set: {project_id}\")\n",
"project_id = project_id[0]\n",
"\n",
"if len(user) != 1:\n",
" raise RuntimeError(f\"user is not set: {user}\")\n",
"user = user[0]\n",
"\n",
"print(f\"project_id = {project_id}\")\n",
"print(f\"user = {user}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "sZ6m_wGrK0YG"
},
"source": [
"### <font color='#4285f4'>Helper Methods</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "JbOjdSP1kN9T"
},
"source": [
"#### restAPIHelper\n",
"Calls the Google Cloud REST API using the current users credentials."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "40wlwnY4kM11"
},
"outputs": [],
"source": [
"def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n",
" \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n",
"\n",
" import requests\n",
" import google.auth\n",
" import json\n",
"\n",
" # Get an access token based upon the current user\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request()\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" if http_verb == \"GET\":\n",
" response = requests.get(url, headers=headers)\n",
" elif http_verb == \"POST\":\n",
" response = requests.post(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PUT\":\n",
" response = requests.put(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PATCH\":\n",
" response = requests.patch(url, json=request_body, headers=headers)\n",
" elif http_verb == \"DELETE\":\n",
" response = requests.delete(url, headers=headers)\n",
" else:\n",
" raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n",
"\n",
" if response.status_code == 200:\n",
" return json.loads(response.content)\n",
" #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n",
" else:\n",
" error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n",
" raise RuntimeError(error)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "lkj0u39akoZu"
},
"source": [
"#### createApacheKafkaForBigQueryCluster\n",
"Creates the cluster if it does not exist. Waits for it to be created."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "OvhCPD8vkMzV"
},
"outputs": [],
"source": [
"def createApacheKafkaForBigQueryCluster():\n",
" \"\"\"Creates a Apache Kafka For BigQuery Cluster.\"\"\"\n",
"\n",
" # First find the cluster if it exists\n",
" # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters\"\n",
"\n",
" # Gather existing clusters\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}\")\n",
"\n",
" # Test to see if cluster exists, if so return\n",
" if \"clusters\" in json_result:\n",
" for item in json_result[\"clusters\"]:\n",
" print(f\"Apache Kafka for BigQuery: {item['name']}\")\n",
" # \"projects/${project_id}/locations/us-central1/clusters/kafka-cluster\"\n",
" if item[\"name\"] == f\"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}\":\n",
" print(\"Apache Kafka for BigQuery already exists\")\n",
" return f\"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}\"\n",
"\n",
" # Create Apache Kafka For BigQuery Cluster\n",
" # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/create\n",
" print(\"Creating Apache Kafka For BigQuery Cluster\")\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters?clusterId={kafka_cluster_name}\"\n",
"\n",
" # Larger Apache Kafka Cluster\n",
" # vcpuCount: 32 -> You can probably use less CPUs since they are mainly ideal\n",
" # memoryBytes: 34359738368 -> RAM was at 50% when doing 11,000 customers\n",
"\n",
" request_body = {\n",
" \"capacityConfig\": {\n",
" \"vcpuCount\": \"3\",\n",
" \"memoryBytes\": \"3221225472\"\n",
" },\n",
" \"gcpConfig\": {\n",
" \"accessConfig\": {\n",
" \"networkConfigs\": {\n",
" \"subnet\": f\"projects/{project_id}/regions/{region}/subnetworks/{kafka_subnet}\"\n",
" }\n",
" }\n",
" }\n",
" }\n",
"\n",
" json_result = restAPIHelper(url, \"POST\", request_body)\n",
"\n",
" name = json_result[\"name\"]\n",
" done = json_result[\"done\"]\n",
" print(\"Apache Kafka for BigQuery created: \", name)\n",
" return f\"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fo38SP8YuFdb"
},
"source": [
"#### waitForApacheKafkaForBigQueryCluster\n",
"Loops until cluster is created"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "viQtCUXXoxih"
},
"outputs": [],
"source": [
"def waitForApacheKafkaForBigQueryCluster(operation):\n",
" \"\"\"\n",
" Waits for an Apache Kafka For BigQuery Cluster to be Created.\n",
"\n",
" opertion:\n",
" projects/${project_id}/locations/us-central1/operations/operation-1723064212031-61f1e264889a9-9e3a863b-90613855\n",
" \"\"\"\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/{operation}\"\n",
" max_retries = 100\n",
" attempt = 0\n",
"\n",
" while True:\n",
" # Gather existing connections\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"waitForApacheKafkaForBigQueryCluster (GET) json_result: {json_result}\")\n",
"\n",
" # Test to see if connection exists, if so return\n",
" if \"state\" in json_result:\n",
" if json_result[\"state\"] == \"ACTIVE\":\n",
" print(\"Apache Kafka for BigQuery Cluster created\")\n",
" return None\n",
"\n",
" # Wait for 10 seconds\n",
" attempt += 1\n",
" if attempt > max_retries:\n",
" raise RuntimeError(\"Apache Kafka for BigQuery Cluster not created\")\n",
" time.sleep(30)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "WUwyHLMhuYws"
},
"source": [
"#### deleteApacheKafkaForBigQueryCluster\n",
"Delete the cluster if exists"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "LYli-CSOkMuN"
},
"outputs": [],
"source": [
"def deleteApacheKafkaForBigQueryCluster():\n",
" \"\"\"Deletes a Apache Kafka For BigQuery Cluster.\"\"\"\n",
"\n",
" # First find the cluster if it exists\n",
" # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/list\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters\"\n",
"\n",
" # Gather existing clusters\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}\")\n",
" found = False\n",
"\n",
" # Test to see if cluster, if so then delete\n",
" if \"clusters\" in json_result:\n",
" for item in json_result[\"clusters\"]:\n",
" print(f\"Apache Kafka for BigQuery: {item['name']}\")\n",
" # \"projects/${project_id}/locations/us-central1/clusters/kafka-cluster\"\n",
" if item[\"name\"] == f\"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}\":\n",
" print(\"Apache Kafka for BigQuery exists\")\n",
" found = True\n",
" break\n",
"\n",
" if found == False:\n",
" print(\"Apache Kafka for BigQuery does not exist\")\n",
" return None\n",
"\n",
" # Create Apache Kafka For BigQuery Cluster\n",
" # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters/delete\n",
" print(\"Deleting Apache Kafka For BigQuery Cluster\")\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}\"\n",
"\n",
" json_result = restAPIHelper(url, \"DELETE\", request_body={})\n",
"\n",
" print(\"Apache Kafka for BigQuery deleted\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "YCX1j-VKuecW"
},
"source": [
"#### createApacheKafkaForBigQueryTopic\n",
"Create the topic if not exists"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "k0PlAgsNkMrl"
},
"outputs": [],
"source": [
"def createApacheKafkaForBigQueryTopic():\n",
" \"\"\"Creates a Apache Kafka For BigQuery Topic.\"\"\"\n",
"\n",
" # First find the topic if it exists\n",
" # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters.topics/list\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics\"\n",
"\n",
" # Gather existing clusters\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"createApacheKafkaForBigQueryCluster (GET) json_result: {json_result}\")\n",
"\n",
" # Test to see if cluster exists, if so return\n",
" if \"topics\" in json_result:\n",
" for item in json_result[\"topics\"]:\n",
" print(f\"Apache Kafka for BigQuery Topic: {item['name']}\")\n",
" # \"projects/${project_id}/locations/us-central1/clusters/kafka-cluster\"\n",
" if item[\"name\"] == f\"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics/{kafka_topic_name}\":\n",
" print(\"Apache Kafka for BigQuery Topic already exists\")\n",
" return None\n",
"\n",
"\n",
" # Create Apache Kafka For BigQuery Topic\n",
" # https://cloud.google.com/managed-kafka/docs/reference/rest/v1/projects.locations.clusters.topics/create\n",
" print(\"Creating Apache Kafka For BigQuery Topic\")\n",
"\n",
" url = f\"https://managedkafka.googleapis.com/v1/projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics?topicId={kafka_topic_name}\"\n",
"\n",
" # partition_count 32 -> for larger cluster\n",
" request_body = {\n",
" \"partition_count\" : 6,\n",
" \"replication_factor\" : 3\n",
" }\n",
"\n",
" json_result = restAPIHelper(url, \"POST\", request_body)\n",
"\n",
" name = json_result[\"name\"]\n",
" print(\"Apache Kafka for BigQuery Topic created: \", name)\n",
" return None"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "i9fQa--IBhKb"
},
"source": [
"#### RunQuery\n",
"Runs a BigQuery SQL statement"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Qj8GYx7BBh0d"
},
"outputs": [],
"source": [
"def RunQuery(sql):\n",
" import time\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client()\n",
"\n",
" if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n",
" df_result = client.query(sql).to_dataframe()\n",
" return df_result\n",
" else:\n",
" job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n",
" query_job = client.query(sql, job_config=job_config)\n",
"\n",
" # Check on the progress by getting the job's updated state.\n",
" query_job = client.get_job(\n",
" query_job.job_id, location=query_job.location\n",
" )\n",
" print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n",
"\n",
" while query_job.state != \"DONE\":\n",
" time.sleep(2)\n",
" query_job = client.get_job(\n",
" query_job.job_id, location=query_job.location\n",
" )\n",
" print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n",
"\n",
" if query_job.error_result == None:\n",
" return True\n",
" else:\n",
" raise Exception(query_job.error_result)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "PG-F0PfzAZl1"
},
"outputs": [],
"source": [
"def GetMaxNextValue(fully_qualified_table_name, field_name):\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client()\n",
" sql = f\"\"\"\n",
" SELECT IFNULL(MAX({field_name}),0) AS result\n",
" FROM `{fully_qualified_table_name}`\n",
" \"\"\"\n",
" # print(sql)\n",
" df_result = client.query(sql).to_dataframe()\n",
" # display(df_result)\n",
" return int(df_result['result'].iloc[0]) + 1"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "A84k91wbujTD"
},
"source": [
"### <font color='#4285f4'>Create Apache Kafka for BigQuery Cluster</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "g9qfeFo-Lksx"
},
"source": [
"Create the cluster and the topic."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "i1AIo1Z5uLKm"
},
"outputs": [],
"source": [
"# To see your clusters: https://console.cloud.google.com/managedkafka/clusterList\n",
"\n",
"opertion = createApacheKafkaForBigQueryCluster()\n",
"\n",
"if opertion is not None:\n",
" waitForApacheKafkaForBigQueryCluster(opertion)\n",
"\n",
"createApacheKafkaForBigQueryTopic()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "GNs0zbGokXNQ"
},
"source": [
"### <font color='#4285f4'>Create BigQuery tables</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2swqVyBGqmxJ"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"#DROP TABLE `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`;\n",
"\n",
"CREATE TABLE IF NOT EXISTS `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`\n",
"(\n",
" customer_geo_location_id STRING OPTIONS(description=\"Primary key.\"),\n",
" customer_id INT64 OPTIONS(description=\"Foreign Key: Customer\"),\n",
" event_timestamp_millis INT64 OPTIONS(description=\"Unix Epoch timestamp on the customers device\"),\n",
" prior_latitude FLOAT64 OPTIONS(description=\"The prior latitude of the customer location.\"),\n",
" prior_longitude FLOAT64 OPTIONS(description=\"The prior longitude of the customer location.\"),\n",
" current_latitude FLOAT64 OPTIONS(description=\"The current latitude of the customer location.\"),\n",
" current_longitude FLOAT64 OPTIONS(description=\"The current longitude of the customer location.\"),\n",
"\n",
" debug_destination_latitude FLOAT64 OPTIONS(description=\"The destination latitude of the customer location. We would not know this, but this if for debugging/demo purposes.\"),\n",
" debug_destination_longitude FLOAT64 OPTIONS(description=\"The destination longitude of the customer location. We would not know this, but this if for debugging/demo purposes.\"),\n",
" debug_walking_speed_mps FLOAT64 OPTIONS(description=\"The speed at which the person is walking. We could compute this, but this if for debugging/demo purposes.\"),\n",
" debug_map_url STRING OPTIONS(description=\"Opens a Google Map link so we can see where the customer is on a map.\")\n",
")\n",
"CLUSTER BY customer_id, event_timestamp_millis;"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "RGJ0h9uqjtvr"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"#DROP TABLE `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location_results`;\n",
"\n",
"CREATE TABLE IF NOT EXISTS `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location_results`\n",
"(\n",
" customer_geo_location_id STRING OPTIONS(description=\"Primary key.\"),\n",
" customer_id INT64 OPTIONS(description=\"Foreign Key: Customer\"),\n",
" genai_message STRING OPTIONS(description=\"Gemini sample marketing message.\"),\n",
" current_latitude FLOAT64 OPTIONS(description=\"The current latitude of the customer location.\"),\n",
" current_longitude FLOAT64 OPTIONS(description=\"The current longitude of the customer location.\"),\n",
" prior_distance_to_store_kilometers FLOAT64 OPTIONS(description=\"The prior distance the customers is to a store.\"),\n",
" current_distance_to_store_kilometers FLOAT64 OPTIONS(description=\"The current distance the customers is to a store.\"),\n",
" store_id INT64 OPTIONS(description=\"The store id we are measuring distance from.\"),\n",
" store_name STRING OPTIONS(description=\"The store name we are measuring distance from.\"),\n",
" debug_map_url STRING OPTIONS(description=\"Opens a Google Map link so we can see where the customer is on a map.\"),\n",
" event_timestamp_millis INT64 OPTIONS(description=\"Unix Epoch timestamp on the customers device.\"),\n",
" geo_boundry_entry_timestamp TIMESTAMP OPTIONS(description=\"The timestamp the data was procssed by the continuous query.\"),\n",
")\n",
"CLUSTER BY customer_id, event_timestamp_millis;"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "XiketZsXkdRe"
},
"source": [
"### <font color='#4285f4'>Create Avro Schema for Kafka Messages</font>\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "qejUL7fYLX8n"
},
"source": [
"Used to parse data in BigQuery to seperate fields. The schema must match your BigQuery Table and will be used by the DataFlow job to seperate the fields into columns within the table.\n",
"\n",
"NOTE: This is currently note used until schema registry is in Kafka."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "gotxGXfHOwTd"
},
"outputs": [],
"source": [
"avro_schema = {\n",
" 'namespace': 'com.databeans.customer_geo_location',\n",
" 'type': 'record',\n",
" 'name': 'customer_geo_location',\n",
" 'fields': [\n",
" {'name': 'customer_geo_location_id', 'type': 'string'},\n",
" {'name': 'customer_id', 'type': 'int'},\n",
" {'name': 'event_timestamp_millis', 'type': 'long'},\n",
" {'name': 'prior_latitude', 'type': 'double'},\n",
" {'name': 'prior_longitude', 'type': 'double'},\n",
" {'name': 'current_latitude', 'type': 'double'},\n",
" {'name': 'current_longitude', 'type': 'double'},\n",
" {'name': 'debug_destination_latitude', 'type': 'double'},\n",
" {'name': 'debug_destination_longitude', 'type': 'double'},\n",
" {'name': 'debug_walking_speed_mps', 'type': 'double'},\n",
" {'name': 'debug_map_url', 'type': 'string'}\n",
" ]\n",
"}\n",
"\n",
"with open('customer_geo_location.avsc', 'w') as out:\n",
" json.dump(avro_schema, out, indent=4)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "NT1bz7fF81J5"
},
"outputs": [],
"source": [
"# Save to storage so dataflow job can see it\n",
"!gsutil cp customer_geo_location.avsc gs://${chocolate_ai_bucket}/customer_geo_location.avsc"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "TRt5FC6XDtiw"
},
"source": [
"### <font color='#4285f4'>DataFlow - Stream Data from Apache Kafka for BigQuery to a BigQuery Table (streaming ingestion)</font>\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "29jh-SbjC783"
},
"source": [
"- <font color='red'>**WARNING:**</font> This will create a new job everytime this is run. The notebook will only stop the lastest job, so please check the DataFlow UI to Cancel any additional jobs."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6TUUHdtlFxnQ"
},
"source": [
"#### createDataflowJobApacheKafkaToBigQuery\n",
"Creates the DataFlow Job (verifies the job name, must be a new name for each job)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "bOxdoc3Fwqbo"
},
"outputs": [],
"source": [
"def createDataflowJobApacheKafkaToBigQuery(jobName):\n",
" \"\"\"Creates a DataFlow job to copy data from Apache Kafka for BiqQuery to stream data into a BigQuery Table\"\"\"\n",
"\n",
" # First find the job if it exists\n",
" # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list\n",
"\n",
" url = f\"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}\"\n",
"\n",
" # Gather existing job\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"createDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}\")\n",
"\n",
" # Test to see if job exists, if so return\n",
" if \"jobs\" in json_result:\n",
" for item in json_result[\"jobs\"]:\n",
" print(f\"DataFlow Job Name: {item['name']}\")\n",
" if item[\"name\"] == jobName:\n",
" print(f\"DataFlow job already exists with date of {item['currentState']}. Try a new name.\")\n",
" return None\n",
"\n",
" # Create DataFlow Job\n",
" # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch\n",
" print(\"Creating DataFlow Job from Flex Template\")\n",
"\n",
" url = f\"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/flexTemplates:launch\"\n",
"\n",
" # Continuous Queries needs useStorageWriteApiAtLeastOnce = True\n",
" # https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery#optional-parameters\n",
" #numStorageWriteApiStreams : Specifies the number of write streams, this parameter must be set. Default is 0.\n",
" #storageWriteApiTriggeringFrequencySec : Specifies the triggering frequency in seconds, this parameter must be set. Default is 5 seconds.\n",
" #useStorageWriteApiAtLeastOnce : This parameter takes effect only if \"Use BigQuery Storage Write API\" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.\n",
"\n",
" request_body = {\n",
" \"launch_parameter\": {\n",
" \"jobName\": jobName,\n",
" \"containerSpecGcsPath\": \"gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex\",\n",
" \"parameters\": {\n",
" \"readBootstrapServerAndTopic\": f\"projects/{project_id}/locations/{region}/clusters/{kafka_cluster_name}/topics/{kafka_topic_name}\",\n",
" \"persistKafkaKey\": \"false\",\n",
" \"writeMode\": \"SINGLE_TABLE_NAME\",\n",
" \"numStorageWriteApiStreams\": \"2\",\n",
" \"useStorageWriteApiAtLeastOnce\": \"true\",\n",
" \"storageWriteApiTriggeringFrequencySec\": \"5\",\n",
" \"enableCommitOffsets\": \"false\",\n",
" \"kafkaReadOffset\": \"latest\",\n",
" \"kafkaReadAuthenticationMode\": \"APPLICATION_DEFAULT_CREDENTIALS\",\n",
" \"messageFormat\": \"JSON\",\n",
" \"useBigQueryDLQ\": \"false\",\n",
" \"stagingLocation\": f\"gs://{dataflow_bucket}/staging\",\n",
" \"autoscalingAlgorithm\": \"NONE\",\n",
" \"serviceAccount\": dataflow_service_account,\n",
" \"usePublicIps\": \"false\",\n",
" \"labels\": \"{\\\"goog-dataflow-provided-template-type\\\":\\\"flex\\\",\\\"goog-dataflow-provided-template-name\\\":\\\"kafka_to_bigquery_flex\\\",\\\"goog-dataflow-provided-template-version\\\":\\\"2024-07-16-00_rc00\\\"}\",\n",
" \"outputTableSpec\": f\"{project_id}:{bigquery_dataset_name}.{bigquery_streaming_destination_table}\"\n",
" },\n",
" \"environment\": {\n",
" \"numWorkers\": 2,\n",
" \"tempLocation\": f\"gs://{dataflow_bucket}/tmp\",\n",
" \"subnetwork\": f\"regions/{region}/subnetworks/{dataflow_subnet}\",\n",
" \"enableStreamingEngine\": True,\n",
" \"additionalExperiments\": [\n",
" \"enable_streaming_engine\"\n",
" ],\n",
" \"additionalUserLabels\": {}\n",
" }\n",
" }\n",
"}\n",
"\n",
" json_result = restAPIHelper(url, \"POST\", request_body)\n",
"\n",
" job = json_result[\"job\"]\n",
" print(\"Apache Kafka for BigQuery created: \", job)\n",
" return job"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "k9p6kRH4F6vv"
},
"source": [
"#### stopDataflowJobApacheKafkaToBigQuery\n",
"Stops a DataFlow job. Looks up the ID based upon the name."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "UudLX-N19wpK"
},
"outputs": [],
"source": [
"def stopDataflowJobApacheKafkaToBigQuery(jobName):\n",
" \"\"\"Stops a DataFlow job to copy data from Apache Kafka for BiqQuery to stream data into a BigQuery Table\"\"\"\n",
"\n",
" # First find the job if it exists\n",
" # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list\n",
"\n",
" url = f\"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}\"\n",
"\n",
" # Gather existing jobs\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"stopDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}\")\n",
" found = False\n",
"\n",
" # Test to see if job exists, if so return\n",
" if \"jobs\" in json_result:\n",
" for item in json_result[\"jobs\"]:\n",
" print(f\"DataFlow Job Name: {item['name']} - {item['currentState']}\")\n",
" if item[\"name\"] == jobName and item[\"currentState\"] == \"JOB_STATE_RUNNING\":\n",
" jobId = item[\"id\"]\n",
" found = True\n",
" break\n",
"\n",
" if not found:\n",
" print(\"DataFlow not found or is not running.\")\n",
" return\n",
"\n",
" # Stop DataFlow Job\n",
" # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/update\n",
" print(\"Stopping DataFlow Job \")\n",
"\n",
" url=f\"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/jobs/{jobId}\"\n",
" print(url)\n",
"\n",
" request_body = { \"requestedState\" : \"JOB_STATE_CANCELLED\" }\n",
"\n",
" json_result = restAPIHelper(url, \"PUT\", request_body)\n",
"\n",
" #job = json_result[\"job\"]\n",
" print(\"DataFlow Job Stopped\")\n",
" return"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IrARsGGPpbsc"
},
"source": [
"#### waitForDataFlowJobToStart\n",
"Waits for a job to start"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "kTOTajtzpbMR"
},
"outputs": [],
"source": [
"def waitForDataFlowJobToStart(jobName):\n",
" \"\"\"Waits for job to turn to running\"\"\"\n",
"\n",
" # First find the job if it exists\n",
" # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list\n",
"\n",
" url = f\"https://dataflow.googleapis.com/v1b3/projects/{project_id}/jobs?location={region}\"\n",
"\n",
" # Gather existing jobs\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
" print(f\"stopDataflowJobApacheKafkaToBigQuery (GET) json_result: {json_result}\")\n",
" found = False\n",
"\n",
" # Test to see if job exists, if so return\n",
" if \"jobs\" in json_result:\n",
" for item in json_result[\"jobs\"]:\n",
" print(f\"DataFlow Job Name: {item['name']} - {item['currentState']}\")\n",
" if item[\"name\"] == jobName:\n",
" jobId = item[\"id\"]\n",
" found = True\n",
" break\n",
"\n",
" if not found:\n",
" print(\"DataFlow not found or is not running.\")\n",
" return\n",
"\n",
" # Gets the job status\n",
" # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get\n",
" print(\"Getting DataFlow Job \")\n",
" url=f\"https://dataflow.googleapis.com/v1b3/projects/{project_id}/locations/{region}/jobs/{jobId}\"\n",
" print(url)\n",
"\n",
" max_retries = 100\n",
" attempt = 0\n",
"\n",
" while True:\n",
" # Get Job\n",
" json_result = restAPIHelper(url, \"GET\", None)\n",
"\n",
" # Test to see if connection exists, if so return\n",
" if \"currentState\" in json_result:\n",
" print(f\"waitForDataFlowJobToStart (GET) currentState: {json_result['currentState']}\")\n",
" if json_result[\"currentState\"] == \"JOB_STATE_RUNNING\":\n",
" print(\"DataFlow Job is now running\")\n",
" return None\n",
" else:\n",
" print(f\"waitForDataFlowJobToStart (GET) json_result: {json_result}\")\n",
"\n",
"\n",
" # Wait for 10 seconds\n",
" attempt += 1\n",
" if attempt > max_retries:\n",
" raise RuntimeError(\"DataFlow Job not created\")\n",
" time.sleep(30)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "lacWbLDHGCL4"
},
"source": [
"#### Run the DataFlow Job"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "t5sqN46oxfxy"
},
"outputs": [],
"source": [
"# The job can take a few minutes to start. Click the link to see the progress:\n",
"# https://console.cloud.google.com/dataflow/jobs\n",
"\n",
"jobName= f\"kafka-stream-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}\"\n",
"createDataflowJobApacheKafkaToBigQuery(jobName)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "zh-F6ofXZWgM"
},
"outputs": [],
"source": [
"print(f\"https://console.cloud.google.com/dataflow/jobs?project={project_id}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "9biLEhVgq6BJ"
},
"outputs": [],
"source": [
"waitForDataFlowJobToStart(jobName)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ftO94aBR-t1O"
},
"source": [
"### <font color='#4285f4'>Token Provider / Confluent Provider</font>\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Dnv_oh6fPhDq"
},
"source": [
"Use logged in users credentials (versus a service account)\n",
"\n",
"* https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer\n",
"* https://github.com/googleapis/managedkafka\n",
"* https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "hr_xe7gMHM5V"
},
"outputs": [],
"source": [
"import google.auth\n",
"import google.auth.transport.urllib3\n",
"import urllib3\n",
"import json\n",
"import base64\n",
"import datetime\n",
"import time\n",
"# *** Add this import ***\n",
"from kafka.sasl.oauth import AbstractTokenProvider\n",
"\n",
"# Modify the class definition to inherit from AbstractTokenProvider\n",
"class TokenProvider(AbstractTokenProvider): # <--- CHANGE HERE\n",
"\n",
" def __init__(self, **config):\n",
" # Keep your existing init logic\n",
" self.credentials, _project = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform']) # Ensure appropriate scope\n",
" self.http_client = urllib3.PoolManager()\n",
" self.HEADER = json.dumps(dict(typ='JWT', alg='GOOG_OAUTH2_TOKEN'))\n",
"\n",
" def valid_credentials(self):\n",
" if not self.credentials.valid:\n",
" # Use the correct Request object for urllib3\n",
" self.credentials.refresh(google.auth.transport.urllib3.Request(self.http_client))\n",
" return self.credentials\n",
"\n",
" def get_jwt(self, creds):\n",
" # print(creds.expiry.timestamp())\n",
" # Ensure creds.service_account_email exists if using ADC from a service account\n",
" # If using user ADC, this might need adjustment or might not be required in the JWT 'sub'\n",
" subject = getattr(creds, 'service_account_email', 'user_adc') # Use a placeholder if email isn't available\n",
" return json.dumps(\n",
" dict(\n",
" exp=creds.expiry.timestamp(),\n",
" iss='Google', # Or specific issuer if required\n",
" iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),\n",
" # 'scope' in JWT might be redundant if token itself has scope, verify if needed\n",
" # scope='kafka',\n",
" sub=subject,\n",
" )\n",
" )\n",
"\n",
" def b64_encode(self, source):\n",
" return (\n",
" base64.urlsafe_b64encode(source.encode('utf-8'))\n",
" .decode('utf-8')\n",
" .rstrip('=')\n",
" )\n",
"\n",
" def get_kafka_access_token(self, creds):\n",
" # This constructs the token string specific to Google Managed Kafka OAUTHBEARER\n",
" # It seems to combine metadata (header, jwt) with the actual Google token\n",
" return '.'.join([\n",
" self.b64_encode(self.HEADER),\n",
" self.b64_encode(self.get_jwt(creds)),\n",
" # Use the actual Google OAuth token obtained via ADC\n",
" # We don't need to base64 encode creds.token here, it's already a token string.\n",
" # However, the structure you have implies the THIRD part should be base64 encoded.\n",
" # Double check Google's documentation for the exact format. Assuming your original logic is correct:\n",
" self.b64_encode(creds.token)\n",
" # If the third part should just be the token:\n",
" # creds.token\n",
" ])\n",
"\n",
" # This method now fulfills the AbstractTokenProvider requirement for kafka-python\n",
" def token(self):\n",
" try:\n",
" # print(\"TokenProvider.token() called\")\n",
" creds = self.valid_credentials()\n",
" return self.get_kafka_access_token(creds)\n",
" except Exception as e:\n",
" print(f\"Error in TokenProvider.token: {e}\")\n",
" # Consider logging the traceback for detailed debugging\n",
" # import traceback\n",
" # traceback.print_exc()\n",
" raise # Re-raise the exception\n",
"\n",
" # This method provides the format needed for confluent-kafka-python's callback\n",
" def confluent_token(self):\n",
" try:\n",
" # print(\"TokenProvider.confluent_token() called\")\n",
" creds = self.valid_credentials()\n",
" token_str = self.get_kafka_access_token(creds)\n",
"\n",
" # Calculate expiry timestamp in milliseconds since epoch\n",
" # creds.expiry is already timezone-aware (usually UTC) from google-auth\n",
" expiry_timestamp_ms = int(creds.expiry.timestamp() * 1000)\n",
"\n",
" return token_str, expiry_timestamp_ms # <--- Return token and expiry in MS\n",
" except Exception as e:\n",
" print(f\"Error in TokenProvider.confluent_token: {e}\")\n",
" # import traceback\n",
" # traceback.print_exc()\n",
" raise # Re-raise the exception\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "FHxplUmlODR2"
},
"outputs": [],
"source": [
"# Confluent does not use a TokenProvider, it calls a method\n",
"def ConfluentTokenProvider(args, config):\n",
" \"\"\"Method to get the Confluent Token\"\"\"\n",
" t = TokenProvider()\n",
" return t.confluent_token()\n",
"\n",
"\n",
"# Print any Confluent errors (for debugging)\n",
"def ConfluentErrorProvider(e):\n",
" print(e)\n",
" raise Exception(e)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "S9hb5KkiB2pe"
},
"outputs": [],
"source": [
"# For Debugging - WARNING: Never save these in your output of the notebook!\n",
"# t = TokenProvider()\n",
"# t.token()\n",
"\n",
"# ConfluentTokenProvider(None, None)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "v_x4X8RSNMJt"
},
"source": [
"#### Helper Methods (Fake Data and Callback)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "z0jxPXHjKb94"
},
"outputs": [],
"source": [
"# This can be used to do a callback when you publish a message\n",
"# Since we might publish a lot of messages, this is commented out in the Producer code\n",
"\n",
"def delivery_callback(err, msg):\n",
" if err:\n",
" print('ERROR: Message failed delivery: {}'.format(err))\n",
" else:\n",
" print(\"Produced event to topic {topic}: value = {value:12}\".format(topic=msg.topic(), value=msg.value().decode('utf-8')))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "TP8XKiCRfa65"
},
"source": [
"### <font color='#4285f4'>Latitude / Longitude Helper Methods</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ITnZPgO_b9m2"
},
"source": [
"##### haversine_distance"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "VItkUgxsiO5C"
},
"outputs": [],
"source": [
"import math\n",
"\n",
"def haversine_distance(lat1, lon1, lat2, lon2):\n",
" \"\"\"\n",
" Calculates the haversine distance between two points on a sphere.\n",
"\n",
" Args:\n",
" lat1: Latitude of the first point in radians.\n",
" lon1: Longitude of the first point in radians.\n",
" lat2: Latitude of the second point in radians.\n",
" lon2: Longitude of the second point in radians.\n",
"\n",
" Returns:\n",
" The distance between the two points in kilometers.\n",
" \"\"\"\n",
"\n",
" # Earth's radius in kilometers\n",
" R = 6371\n",
"\n",
" # Convert degrees to radians\n",
" lat1 = math.radians(lat1)\n",
" lon1 = math.radians(lon1)\n",
" lat2 = math.radians(lat2)\n",
" lon2 = math.radians(lon2)\n",
"\n",
" # Haversine formula\n",
" dlat = lat2 - lat1\n",
" dlon = lon2 - lon1\n",
" a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2\n",
" c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))\n",
" distance = R * c\n",
"\n",
" return distance"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "y-QP67qghMrS"
},
"outputs": [],
"source": [
"# Create 10 people who are in london\n",
"# Have the walk towards the location location_1_lat_min, london_lat_max = 51.5174328, -0.1219854\n",
"# When they are within 1 km send them an alery\n",
"# Only send them the alert once\n",
"\n",
"# have people walk at different speeds\n",
"# The average walking speed for most adults is around 4.8 kilometers per hour"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "etIgADeob6FF"
},
"source": [
"##### bounding_box"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "TzHH2HWuE2d5"
},
"outputs": [],
"source": [
"import math\n",
"\n",
"def bounding_box(latitude, longitude, distance_km):\n",
" \"\"\"\n",
" Calculates the bounding box coordinates for a given latitude, longitude, and distance.\n",
"\n",
" Args:\n",
" latitude: Latitude of the center point in decimal degrees.\n",
" longitude: Longitude of the center point in decimal degrees.\n",
" distance_km: Distance in kilometers for the bounding box.\n",
"\n",
" Returns:\n",
" A tuple containing the minimum and maximum latitude and longitude values\n",
" (min_lat, max_lat, min_lon, max_lon).\n",
" \"\"\"\n",
"\n",
" # Earth's radius in kilometers\n",
" earth_radius_km = 6371\n",
"\n",
" # Convert latitude and longitude to radians\n",
" lat_rad = math.radians(latitude)\n",
" lon_rad = math.radians(longitude)\n",
"\n",
" # Calculate angular radius\n",
" angular_radius = distance_km / earth_radius_km\n",
"\n",
" # Calculate bounding box coordinates\n",
" min_lat = math.degrees(lat_rad - angular_radius)\n",
" max_lat = math.degrees(lat_rad + angular_radius)\n",
"\n",
" # Handle potential issues with longitude calculations near the poles\n",
" if abs(lat_rad) > math.pi / 2 - angular_radius:\n",
" # Adjust longitude range to cover the entire circle\n",
" min_lon = -180\n",
" max_lon = 180\n",
" else:\n",
" min_lon = math.degrees(lon_rad - angular_radius / math.cos(lat_rad))\n",
" max_lon = math.degrees(lon_rad + angular_radius / math.cos(lat_rad))\n",
"\n",
" return min_lat, max_lat, min_lon, max_lon"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "nm_hoP6Xb_AT"
},
"source": [
"### <font color='#4285f4'>Kafka (Open Source and Confluent) Producers</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "8dhBImkEb0q9"
},
"source": [
"##### simulate_walk_open_source_kafka_producer (Open Source Kafka Producer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "9kV82HHSIP8I"
},
"outputs": [],
"source": [
"import time\n",
"from geopy.distance import geodesic\n",
"import urllib.parse\n",
"import uuid\n",
"\n",
"def simulate_walk_open_source_kafka_producer(customer_id, customer_name, starting_latitude, starting_longitude, \\\n",
" ending_latitude, ending_longitude, speed_meters_per_second=1.4, debug_messages = False):\n",
" \"\"\"\n",
" Simulates a walk from a starting point to an ending point.\n",
"\n",
" Args:\n",
" customer_name (str): Name of the person walking.\n",
" starting_latitude (float): Starting latitude in degrees.\n",
" starting_longitude (float): Starting longitude in degrees.\n",
" ending_latitude (float): Ending latitude in degrees.\n",
" ending_longitude (float): Ending longitude in degrees.\n",
" speed_meters_per_second (float, optional): Walking speed in meters per second. Defaults to 1.4.\n",
"\n",
" Prints information about the walk at regular intervals.\n",
" \"\"\"\n",
" from kafka import KafkaProducer\n",
" # Kafka Producer configuration with OAUTHBEARER authentication\n",
" config = {\n",
" 'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',\n",
" 'security_protocol': 'SASL_SSL',\n",
" 'sasl_mechanism': 'OAUTHBEARER',\n",
" 'sasl_oauth_token_provider': TokenProvider(),\n",
" 'reconnect_backoff_ms': 500,\n",
" 'reconnect_backoff_max_ms': 10000 \n",
" }\n",
"\n",
"\n",
" producer = KafkaProducer(**config) # Use keyword unpacking for clear configuration\n",
"\n",
" # Calculate total distance and time\n",
" start = (starting_longitude, starting_latitude) # Swap order for consistency with haversine_distance\n",
" end = (ending_longitude, ending_latitude) # Swap order for consistency with haversine_distance\n",
" total_distance = geodesic(start, end).meters if geodesic else haversine_distance(*start, *end)\n",
" total_time = total_distance / speed_meters_per_second\n",
"\n",
" # Generate data points\n",
" generate_data_time = 1 # seconds\n",
" num_points = int(total_time / generate_data_time) + 1 # Include start and end points\n",
"\n",
" prior_latitude = starting_latitude\n",
" prior_longitude = starting_longitude\n",
"\n",
" for i in range(num_points):\n",
" fraction = i / (num_points - 1) # Normalize fraction for even distribution\n",
" lat = starting_latitude + fraction * (ending_latitude - starting_latitude)\n",
" lon = starting_longitude + fraction * (ending_longitude - starting_longitude)\n",
" distance_to_destination = haversine_distance(lat, lon, ending_latitude, ending_longitude)\n",
" map_url = f\"https://www.google.com/maps/place/{lat},{lon}/@{lat},{lon},17z\"\n",
"\n",
" # Kafka\n",
" # Log\n",
" # user, event_time, current_lat, current_long, starting_lat, starting_long, dest_lat, dest_long, walking_speed_meters_per_second, distance_to_destination, map_url\n",
" message_data = {\n",
" \"customer_geo_location_id\" : f\"{uuid.uuid4()}\",\n",
" \"customer_id\": customer_id,\n",
" \"event_timestamp_millis\": int(time.time() * 1000),\n",
" \"prior_latitude\": prior_latitude,\n",
" \"prior_longitude\": prior_longitude,\n",
" \"current_latitude\": lat,\n",
" \"current_longitude\": lon,\n",
" \"debug_destination_latitude\": ending_latitude,\n",
" \"debug_destination_longitude\": ending_longitude,\n",
" \"debug_walking_speed_mps\": speed_meters_per_second,\n",
" \"debug_map_url\" : f\"{map_url}\"\n",
" }\n",
"\n",
" # Save for next interation\n",
" prior_latitude = lat\n",
" prior_longitude = lon\n",
"\n",
" # Serialize data to bytes\n",
" serialized_data = json.dumps(message_data).encode('utf-8')\n",
"\n",
" # Define the key based on your needs (e.g., customer_id)\n",
" key = str(customer_id).encode('utf-8')\n",
"\n",
" # Produce the message with key\n",
" # producer.send(kafka_topic_name, key=key, value=serialized_data) # callback=delivery_callback\n",
" max_retries = 5\n",
" retry_delay = 1 # seconds\n",
"\n",
" for attempt in range(max_retries):\n",
" try:\n",
" producer.send(kafka_topic_name, key=key, value=serialized_data)\n",
" break # Exit the loop if successful\n",
" except Exception as e:\n",
" if attempt < max_retries - 1:\n",
" print(f\"Failed to send message to Kafka: {e}. Retrying in {retry_delay} seconds.\")\n",
" time.sleep(retry_delay)\n",
" else:\n",
" raise # Raise the exception if all retries fail\n",
"\n",
" if i == 1:\n",
" print(f\"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}\")\n",
" print(f\"message_data: {message_data}\")\n",
"\n",
" if i % 100 == 0:\n",
" if debug_messages:\n",
" print(f\"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}\")\n",
" print(f\"message_data: {message_data}\")\n",
" producer.flush()\n",
"\n",
" time.sleep(generate_data_time)\n",
"\n",
" producer.flush()\n",
"\n",
" print(f\"{customer_id} - {customer_name} walk complete.\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pYQO1IKpXwyY"
},
"source": [
"##### simulate_walk_confluent_kafka_producer (Confluent Kafka Producer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "fIL3cop8Xwyg"
},
"outputs": [],
"source": [
"import time\n",
"from geopy.distance import geodesic\n",
"import urllib.parse\n",
"import uuid\n",
"\n",
"def simulate_walk_confluent_kafka_producer(customer_id, customer_name, starting_latitude, starting_longitude, \\\n",
" ending_latitude, ending_longitude, speed_meters_per_second=1.4, debug_messages = False):\n",
" \"\"\"\n",
" Simulates a walk from a starting point to an ending point.\n",
"\n",
" Args:\n",
" customer_name (str): Name of the person walking.\n",
" starting_latitude (float): Starting latitude in degrees.\n",
" starting_longitude (float): Starting longitude in degrees.\n",
" ending_latitude (float): Ending latitude in degrees.\n",
" ending_longitude (float): Ending longitude in degrees.\n",
" speed_meters_per_second (float, optional): Walking speed in meters per second. Defaults to 1.4.\n",
"\n",
" Prints information about the walk at regular intervals.\n",
" \"\"\"\n",
" import confluent_kafka\n",
" import functools\n",
"\n",
" # Kafka Producer configuration with SASL_PLAIN authentication\n",
" # This requires a service principal key (json file) which must be base64 encoded\n",
" # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket\n",
" # secret = !(cat sa.key.json | base64 -w 0)\n",
" # secret = secret[0]\n",
" #config = {\n",
" # 'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',\n",
" # 'sasl.username': f'kafka-sp@{project_id}.iam.gserviceaccount.com',\n",
" # 'sasl.password': secret,\n",
" # 'security.protocol': 'SASL_SSL',\n",
" # 'sasl.mechanisms': 'PLAIN',\n",
" # 'acks': 'all'\n",
" #}\n",
"\n",
"\n",
" # Kafka Producer configuration with OAUTHBEARER authentication\n",
" # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration\n",
" # https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py\n",
" config = {\n",
" 'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog', # No port needed usually\n",
" 'security.protocol': 'SASL_SSL',\n",
" 'sasl.mechanisms': 'OAUTHBEARER',\n",
" 'oauth_cb': functools.partial(ConfluentTokenProvider, None), # <-- Uses the callback function\n",
" 'error_cb' : functools.partial(ConfluentErrorProvider),\n",
" 'acks': 'all' \n",
" }\n",
"\n",
"\n",
" producer = confluent_kafka.Producer(config)\n",
"\n",
" # Calculate total distance and time\n",
" start = (starting_longitude, starting_latitude) # Swap order for consistency with haversine_distance\n",
" end = (ending_longitude, ending_latitude) # Swap order for consistency with haversine_distance\n",
" total_distance = geodesic(start, end).meters if geodesic else haversine_distance(*start, *end)\n",
" total_time = total_distance / speed_meters_per_second\n",
"\n",
" # Generate data points\n",
" generate_data_time = 1 # seconds\n",
" num_points = int(total_time / generate_data_time) + 1 # Include start and end points\n",
"\n",
" prior_latitude = starting_latitude\n",
" prior_longitude = starting_longitude\n",
"\n",
" for i in range(num_points):\n",
" fraction = i / (num_points - 1) # Normalize fraction for even distribution\n",
" lat = starting_latitude + fraction * (ending_latitude - starting_latitude)\n",
" lon = starting_longitude + fraction * (ending_longitude - starting_longitude)\n",
" distance_to_destination = haversine_distance(lat, lon, ending_latitude, ending_longitude)\n",
" map_url = f\"https://www.google.com/maps/place/{lat},{lon}/@{lat},{lon},17z\"\n",
"\n",
" # Kafka\n",
" # Log\n",
" # user, event_time, current_lat, current_long, starting_lat, starting_long, dest_lat, dest_long, walking_speed_meters_per_second, distance_to_destination, map_url\n",
" message_data = {\n",
" \"customer_geo_location_id\" : f\"{uuid.uuid4()}\",\n",
" \"customer_id\": customer_id,\n",
" \"event_timestamp_millis\": int(time.time() * 1000),\n",
" \"prior_latitude\": prior_latitude,\n",
" \"prior_longitude\": prior_longitude,\n",
" \"current_latitude\": lat,\n",
" \"current_longitude\": lon,\n",
" \"debug_destination_latitude\": ending_latitude,\n",
" \"debug_destination_longitude\": ending_longitude,\n",
" \"debug_walking_speed_mps\": speed_meters_per_second,\n",
" \"debug_map_url\" : f\"{map_url}\"\n",
" }\n",
"\n",
" # Save for next interation\n",
" prior_latitude = lat\n",
" prior_longitude = lon\n",
"\n",
"\n",
" # Serialize data to bytes\n",
" serialized_data = json.dumps(message_data).encode('utf-8')\n",
"\n",
" # Define the key based on your needs (e.g., customer_id)\n",
" key = str(customer_id).encode('utf-8')\n",
"\n",
" # Produce the message with key\n",
" producer.produce(kafka_topic_name, key=key, value=serialized_data) # callback=delivery_callback\n",
"\n",
" if i == 1:\n",
" print(f\"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}\")\n",
" print(f\"message_data: {message_data}\")\n",
"\n",
" if i % 100 == 0:\n",
" if debug_messages:\n",
" print(f\"{customer_name} distance to go ({distance_to_destination:.2f} km): {map_url}\")\n",
" print(f\"message_data: {message_data}\")\n",
" producer.flush()\n",
"\n",
"\n",
" time.sleep(generate_data_time)\n",
"\n",
" producer.flush()\n",
"\n",
" print(f\"{customer_id} - {customer_name} walk complete.\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "M5QKc03pC4LP"
},
"source": [
"### <font color='#4285f4'>Create People Function</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "diPJ-aqQMwhz"
},
"outputs": [],
"source": [
"import threading\n",
"\n",
"def create_people(starting_customer_id, number_of_people, producer_method_name = simulate_walk_open_source_kafka_producer) -> None:\n",
" \"\"\"\n",
" Calculates people who will walk towards various locations. This will generate a thread for each person\n",
" and simulate them walking.\n",
"\n",
" Args:\n",
" number_of_people: The number of people to generate.\n",
" producer_method_name: The method to use to simulate the walk.\n",
"\n",
" Returns:\n",
" None\n",
" \"\"\"\n",
" location_1_latitude, location_1_longitude = 48.852066767829761, 2.3464926959635504 # Rue Galande\n",
" location_2_latitude, location_2_longitude = 48.850829751346133, 2.3245967340236109 # Le Bon Marché\n",
" location_3_latitude, location_3_longitude = 48.867691580985458, 2.3376027993295176 # Square Louvois\n",
" location_4_latitude, location_4_longitude = 48.871015939679289, 2.302960997513936 # Av. des Champs-Élysées\n",
"\n",
" # Create a box where we will generate random people starting locations.\n",
" # This is 10 km from Hôtel des Invalides\n",
" # 48.85744164370035, 2.3128668119381186\n",
" bounding_box_min_lat, bounding_box_max_lat, bounding_box_min_lon, bounding_box_max_lon = bounding_box(48.85744164370035, 2.3128668119381186, 10)\n",
"\n",
" people = []\n",
" for i in range(number_of_people):\n",
" print(f\"Generating Person: {i+starting_customer_id}\")\n",
" destination_latitude = 0\n",
" destination_longitude = 0\n",
"\n",
" if random.random() < 0.10:\n",
" if i % 4 == 0:\n",
" destination_latitude = location_1_latitude\n",
" destination_longitude = location_1_longitude\n",
" elif i % 4 == 1:\n",
" destination_latitude = location_2_latitude\n",
" destination_longitude = location_2_longitude\n",
" elif i % 4 == 2:\n",
" destination_latitude = location_3_latitude\n",
" destination_longitude = location_3_longitude\n",
" else:\n",
" destination_latitude = location_4_latitude\n",
" destination_longitude = location_4_longitude\n",
" else:\n",
" destination_latitude = random.uniform(bounding_box_min_lat, bounding_box_max_lat)\n",
" destination_longitude = random.uniform(bounding_box_min_lon, bounding_box_max_lon)\n",
"\n",
" person_dict = {\n",
" \"customer_id\": i+starting_customer_id,\n",
" \"name\": f\"person {i+starting_customer_id}\",\n",
" \"starting_latitude\": random.uniform(bounding_box_min_lat, bounding_box_max_lat),\n",
" \"starting_longitude\": random.uniform(bounding_box_min_lon, bounding_box_max_lon),\n",
" \"destination_latitude\": destination_latitude,\n",
" \"destination_longitude\": destination_longitude,\n",
" # The average walking speed of a person is approximately 1.4 meters per second. We might have people on bikes or scooters (possibly cars in traffic)\n",
" \"walking_speed_meters_per_second\": round(random.uniform(1, 3),2)\n",
" }\n",
" distance_to_destination = haversine_distance(person_dict[\"starting_latitude\"], person_dict[\"starting_longitude\"],\\\n",
" person_dict[\"destination_latitude\"], person_dict[\"destination_longitude\"])\n",
" #print(distance_to_destination)\n",
" people.append(person_dict)\n",
"\n",
" threads = []\n",
" for item in people:\n",
" threads.append(threading.Thread(target=producer_method_name, args=(\n",
" item[\"customer_id\"], item[\"name\"], \\\n",
" item[\"starting_latitude\"], item[\"starting_longitude\"], \\\n",
" item[\"destination_latitude\"], item[\"destination_longitude\"], \\\n",
" item[\"walking_speed_meters_per_second\"], False)))\n",
"\n",
" current_thread_count = 0\n",
" throttling = 200\n",
" for thread in threads:\n",
" if current_thread_count <= throttling:\n",
" thread.start()\n",
" time.sleep(.5) # give kafka some time to catch up\n",
" else:\n",
" thread.start()\n",
" time.sleep(30) # start a new customer every 30 seconds (when running this for more than 200 customers)\n",
" current_thread_count += 1\n",
"\n",
" # Wait for all the threads\n",
" for thread in threads:\n",
" thread.join()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "7gtEd0M7rkfc"
},
"source": [
"### <font color='#4285f4'>Create Simulation and Test 4 People</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "s1NflkRlaebN"
},
"source": [
"##### Clear the table so we can see our results (easily)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "EiTxYrZ2aYv5"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"TRUNCATE TABLE `chocolate_ai.customer_geo_location`;\n",
"TRUNCATE TABLE `chocolate_ai.customer_geo_location_results`;"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "V5LcAR7xZkK8"
},
"source": [
"##### Test 4 \"people\" records using the Open Source and Confluent Producers"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "FMwCnolZZ8n2"
},
"outputs": [],
"source": [
"# Create 4 people all starting a the same location.\n",
"# Have them each walk towards a store\n",
"# This takes about 2 minutes and 30 second to run\n",
"\n",
"print()\n",
"print(\"-----------------------------------------------------------------------------\")\n",
"print(\"Run the SQL in the next cell in a BigQuery query window while this is running\")\n",
"print(\"-----------------------------------------------------------------------------\")\n",
"print()\n",
"\n",
"# All 4 people will start here\n",
"starting_latitude, starting_longitude = 48.85744164370035, 2.3128668119381186 # Hôtel des Invalides\n",
"\n",
"# They will each walk towards one of our stores (from the stores table)\n",
"# The 4 people will be walking toward one of the stores (on purpose, for testing)\n",
"location_1_latitude, location_1_longitude = 48.852066767829761, 2.3464926959635504 # Rue Galande\n",
"location_2_latitude, location_2_longitude = 48.850829751346133, 2.3245967340236109 # Le Bon Marché\n",
"location_3_latitude, location_3_longitude = 48.867691580985458, 2.3376027993295176 # Square Louvois\n",
"location_4_latitude, location_4_longitude = 48.871015939679289, 2.302960997513936 # Av. des Champs-Élysées\n",
"\n",
"# Test 4 people (walking fast so we can see the results fairly quick)\n",
"threads = []\n",
"\n",
"print(\"Open Source Kafka Producer: Rue Galande\")\n",
"threads.append(threading.Thread(target=simulate_walk_open_source_kafka_producer, args=(10001, \"Person: Rue Galande\", \\\n",
" starting_latitude, starting_longitude, \\\n",
" location_1_latitude, location_1_longitude, \\\n",
" 25, True)))\n",
"\n",
"print(\"Confluent Kafka Producer: Le Bon Marché\")\n",
"threads.append(threading.Thread(target=simulate_walk_confluent_kafka_producer, args=(10002, \"Person: Le Bon Marché\", \\\n",
" starting_latitude, starting_longitude, \\\n",
" location_2_latitude, location_2_longitude, \\\n",
" 30, True)))\n",
"\n",
"print(\"Open Source Kafka Producer: Square Louvois\")\n",
"threads.append(threading.Thread(target=simulate_walk_open_source_kafka_producer, args=(10003, \"Person: Square Louvois\", \\\n",
" starting_latitude, starting_longitude, \\\n",
" location_3_latitude, location_3_longitude, \\\n",
" 35, True)))\n",
"\n",
"print(\"Confluent Kafka Producer: \tAv. des Champs-Élysées\")\n",
"threads.append(threading.Thread(target=simulate_walk_confluent_kafka_producer, args=(10004, \"Person: Av. des Champs-Élysées\", \\\n",
" starting_latitude, starting_longitude, \\\n",
" location_4_latitude, location_4_longitude, \\\n",
" 40, True)))\n",
"\n",
"for thread in threads:\n",
" thread.start()\n",
"\n",
"for thread in threads:\n",
" thread.join()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "i8FOmtB4Zoip"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"-- It might take a second to see the data\n",
"\n",
"-- This will show each customer and their proximity to each store\n",
"WITH raw_data AS (\n",
" SELECT *,\n",
" ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY event_timestamp_millis DESC) AS ranking\n",
" FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`\n",
")\n",
", store_data AS (\n",
" -- This is hardcoded for now until we can join\n",
" SELECT 1 AS store_id,\n",
" 'Le Bon Marché' AS store_name,\n",
" '24 Rue de Sèvres, 75007 Paris' AS store_address,\n",
" 48.850829751346133 AS store_latitude,\n",
" 2.3245967340236109 AS store_longitude\n",
" UNION ALL\n",
" SELECT 2 AS store_id,\n",
" 'Av. des Champs-Élysées' AS store_name,\n",
" '75 Av. des Champs-Élysées, 75008 Paris' AS store_address,\n",
" 48.871015939679289 AS store_latitude,\n",
" 2.302960997513936 AS store_longitude\n",
" UNION ALL\n",
" SELECT 3 AS store_id,\n",
" 'Rue Galande' AS store_name,\n",
" '77 Rue Galande, 75005 Paris' AS store_address,\n",
" 48.852066767829761 AS store_latitude,\n",
" 2.3464926959635504 AS store_longitude\n",
" UNION ALL\n",
" SELECT 4 AS store_id,\n",
" 'Square Louvois' AS store_name,\n",
" '69 Rue de Richelieu, 75002 Paris' AS store_address,\n",
" 48.867691580985458 AS store_latitude,\n",
" 2.3376027993295176 AS store_longitude\n",
")\n",
", geo_data AS (\n",
" SELECT *,\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
"\n",
" FROM raw_data\n",
" CROSS JOIN store_data\n",
")\n",
", results AS (\n",
"-- This tests to see if they were outside the circle and then inside\n",
"-- We only want to send them one notification when they break the geofence\n",
"-- This uses the prior lat/long and compares with the current lat/long\n",
"-- Ideally we would say \"NOT EXISTS\" in our customer_geo_location_results table\n",
" SELECT *,\n",
" CASE WHEN prior_distance_to_store_meters > 1000 AND current_distance_to_store_meters <= 1000\n",
" THEN TRUE\n",
" ELSE FALSE\n",
" END AS entered_geofence\n",
" FROM geo_data\n",
")\n",
"SELECT customer_geo_location_id,\n",
" entered_geofence,\n",
" customer_id,\n",
" current_latitude,\n",
" current_longitude,\n",
" prior_distance_to_store_kilometers,\n",
" current_distance_to_store_meters,\n",
" store_id,\n",
" store_name,\n",
" debug_map_url,\n",
" TO_JSON(STRUCT(CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS STRING) AS event_timestamp)) AS _ATTRIBUTES\n",
" FROM results\n",
" WHERE (ranking = 1 OR entered_geofence = TRUE)\n",
" ORDER BY customer_id, store_id;"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3DazZFYl_46B"
},
"source": [
"### <font color='#4285f4'>Start your BigQuery Continuous Queries (Manual Intervention Required)</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6p9BOzi6hFFg"
},
"source": [
"#### Create a reservation for the continuous query\n",
"- NOTE: You will need to wait several minutes for this to take effect"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "faqJs1TJ_9op"
},
"outputs": [],
"source": [
"# Create a continuous query reservation and assignment\n",
"\n",
"user_input = input(\"Do you want to start BigQuery Reservations? This will START billing and will continue until you remove these in the Clean Up code (Y/n)?\")\n",
"if user_input == \"Y\":\n",
" sql = f\"\"\"CREATE RESERVATION `{project_id}.region-{bigquery_location}.continuous-query-reservation`\n",
" OPTIONS (edition = \"enterprise\",\n",
" slot_capacity = 50);\n",
" \"\"\"\n",
" RunQuery(sql)\n",
"\n",
" sql = f\"\"\"CREATE ASSIGNMENT `{project_id}.region-{bigquery_location}.continuous-query-reservation.continuous-query-reservation-assignment`\n",
" OPTIONS(assignee = \"projects/{project_id}\",\n",
" job_type = \"CONTINUOUS\");\n",
" \"\"\"\n",
" RunQuery(sql)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gHbPA-4GAHdC"
},
"source": [
"#### Run each of the below queries\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "CgqP8HCShZrK"
},
"source": [
"1. Copy the SQL to a BigQuery SQL Window\n",
"2. Under the More menu, select Continuous Query\n",
"3. Under the Query settings, under Continuous query, select kafka-continuous-query for the service account\n",
"4. Run the Query (it will take a minute to start)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "AgTNR60_A4Aj"
},
"source": [
"##### Query 1 (Continuous Query to Pub/Sub)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "_ywMvm6CAyJp"
},
"source": [
"```\n",
"----------------------------------------------------------------------------------------------------------------\n",
"-- Insert the customers who break the geo-boundry into Pub/Sub and then a process (e.g. Cloud Function) will send them an alert\n",
"----------------------------------------------------------------------------------------------------------------\n",
"\n",
"-- Algorithm:\n",
"-- Get the data from the customer_geo_location table (this just gets the current row)\n",
"-- The stores are hardcoded, but you could query the store table. This will be changed when continuous tables can join to tables.\n",
"-- Calculate the distence the customer is from the store\n",
"-- Calculate their prior distence using the prior lat/long (in the future we could use the LAG function)\n",
"-- Calculate their current distance using their current lat/long\n",
"-- Determine if the prior distance was greater 1 km and if thier current distance is less than (or equal) to 1 km. If true, then they entered the geo-boundry.\n",
"-- Select the final results\n",
"-- Select when cross geo-bountry is true\n",
"\n",
"EXPORT DATA OPTIONS(uri=\"https://pubsub.googleapis.com/projects/${project_id}/topics/bq-continuous-query\", format=\"cloud_pubsub\") AS\n",
"WITH raw_data AS (\n",
" SELECT *\n",
" FROM APPENDS(TABLE `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`,\n",
" -- Configure the APPENDS TVF start_timestamp to specify when you want to\n",
" -- start processing data using your continuous query.\n",
" -- Here we process data as ten minutes before the current time.\n",
" CURRENT_TIMESTAMP() - INTERVAL 15 MINUTE)\n",
")\n",
"\n",
", geo_data AS (\n",
" -- The stores are hardcoded until we can perform joins within a continuous query\n",
" SELECT *,\n",
" 1 AS store_id,\n",
" 'Le Bon Marché' AS store_name,\n",
" '24 Rue de Sèvres, 75007 Paris' AS store_address,\n",
"\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
" FROM raw_data\n",
" /*\n",
" UNION ALL\n",
"\n",
" SELECT *,\n",
" 2 AS store_id,\n",
" 'Av. des Champs-Élysées' AS store_name,\n",
" '75 Av. des Champs-Élysées, 75008 Paris' AS store_address,\n",
"\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.302960997513936, 48.871015939679289)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.302960997513936, 48.871015939679289)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.302960997513936, 48.871015939679289)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.302960997513936, 48.871015939679289)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
"\n",
" FROM raw_data \n",
"\n",
" UNION ALL \n",
"\n",
" SELECT *,\n",
" 3 AS store_id,\n",
" 'Rue Galande' AS store_name,\n",
" '77 Rue Galande, 75005 Paris' AS store_address,\n",
"\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
" FROM raw_data \n",
"\n",
" UNION ALL \n",
"\n",
" SELECT *,\n",
" 4 AS store_id,\n",
" 'Square Louvois' AS store_name,\n",
" '69 Rue de Richelieu, 75002 Paris' AS store_address,\n",
"\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
" FROM raw_data\n",
" */\n",
")\n",
"\n",
", results AS (\n",
"-- This tests to see if they were ouside the circle and then inside\n",
"-- We only want to send them one notification when they break the geofence\n",
"-- This uses the prior lat/long and compares with the current lat/long\n",
"-- Ideally we would say \"NOT EXISTS\" in our customer_geo_location_results table\n",
" SELECT *,\n",
" CASE WHEN prior_distance_to_store_meters > 1000 AND current_distance_to_store_meters <= 1000\n",
" THEN TRUE\n",
" ELSE FALSE\n",
" END AS entered_geofence\n",
" FROM geo_data\n",
")\n",
"\n",
", marketing_message AS (\n",
" -- Call Gemini to generate a marketing message\n",
" SELECT *\n",
" FROM ML.GENERATE_TEXT(MODEL`${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model`,\n",
" (SELECT customer_geo_location_id,\n",
" customer_id,\n",
" current_latitude,\n",
" current_longitude,\n",
" prior_distance_to_store_kilometers,\n",
" current_distance_to_store_kilometers,\n",
" store_id,\n",
" store_name,\n",
" debug_map_url,\n",
" entered_geofence,\n",
" event_timestamp_millis,\n",
" CONCAT(\"Create a marketing message for a user who has just entered a geofencing boundy for the company Chocolate AI.\\n\",\n",
" \"The customer is \", current_distance_to_store_kilometers, \" kilometers away from our \", store_name, \" store.\\n\",\n",
" \"Make the message catchy and convience them it is worth the walk.\\n\",\n",
" \"Tell them to use coupon code 'CLOSE-BY' to get 25% off and a free piece of chocolate.\"\n",
" ) AS prompt\n",
" FROM results),\n",
" STRUCT(.8 AS temperature, .8 AS top_p)\n",
" ) \n",
")\n",
"\n",
"SELECT TO_JSON_STRING(STRUCT(customer_geo_location_id,\n",
" customer_id,\n",
" `${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model_result_as_string`(ml_generate_text_result) AS message,\n",
" current_latitude,\n",
" current_longitude,\n",
" prior_distance_to_store_kilometers,\n",
" current_distance_to_store_kilometers,\n",
" store_id,\n",
" store_name,\n",
" debug_map_url)) AS message,\n",
" TO_JSON(STRUCT(CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS STRING) AS event_timestamp)) AS _ATTRIBUTES\n",
" FROM marketing_message\n",
" WHERE entered_geofence = true;\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "y7WMbFOzA8hb"
},
"source": [
"##### Query 2 (Continuous Query to BigQuery table)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "_kFHj2x_BDHC"
},
"source": [
"```\n",
"----------------------------------------------------------------------------------------------------------------\n",
"-- Insert the customers who break the geo-boundry into another BigQuery table\n",
"----------------------------------------------------------------------------------------------------------------\n",
"\n",
"-- Algorithm:\n",
"-- Get the data from the customer_geo_location table (this just gets the current row)\n",
"-- The stores are hardcoded, but you could query the store table. This will be changed when continuous tables can join to tables.\n",
"-- Calculate the distence the customer is from the store\n",
"-- Calculate their prior distence using the prior lat/long (in the future we could use the LAG function)\n",
"-- Calculate their current distance using their current lat/long\n",
"-- Determine if the prior distance was greater 1 km and if thier current distance is less than (or equal) to 1 km. If true, then they entered the geo-boundry.\n",
"-- Insert the final results into the customer_geo_location_results table where cross geo-bountry is true\n",
"\n",
"INSERT INTO `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location_results`\n",
" (customer_geo_location_id,\n",
" customer_id,\n",
" genai_message,\n",
" current_latitude,\n",
" current_longitude,\n",
" prior_distance_to_store_kilometers,\n",
" current_distance_to_store_kilometers,\n",
" store_id,\n",
" store_name,\n",
" debug_map_url,\n",
" event_timestamp_millis,\n",
" geo_boundry_entry_timestamp)\n",
"WITH raw_data AS (\n",
" SELECT *\n",
" FROM APPENDS(TABLE `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`,\n",
" -- Configure the APPENDS TVF start_timestamp to specify when you want to\n",
" -- start processing data using your continuous query.\n",
" -- Here we process data as ten minutes before the current time.\n",
" CURRENT_TIMESTAMP() - INTERVAL 15 MINUTE)\n",
")\n",
"\n",
", geo_data AS (\n",
" -- The stores are hardcoded until we can perform joins within a continuous query\n",
" SELECT *,\n",
" 1 AS store_id,\n",
" 'Rue Galande' AS store_name,\n",
" '77 Rue Galande, 75005 Paris' AS store_address,\n",
"\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3464926959635504, 48.852066767829761)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
" FROM raw_data\n",
"\n",
" /*\n",
"\n",
" UNION ALL \n",
"\n",
" SELECT *,\n",
" 2 AS store_id,\n",
" 'Le Bon Marché' AS store_name,\n",
" '24 Rue de Sèvres, 75007 Paris' AS store_address,\n",
"\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3245967340236109, 48.850829751346133)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
" FROM raw_data \n",
"\n",
" UNION ALL \n",
"\n",
" SELECT *,\n",
" 3 AS store_id,\n",
" 'Square Louvois' AS store_name,\n",
" '69 Rue de Richelieu, 75002 Paris' AS store_address,\n",
"\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.3376027993295176, 48.867691580985458)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
" FROM raw_data\n",
"\n",
" UNION ALL\n",
" \n",
" SELECT *,\n",
" 4 AS store_id,\n",
" 'Av. des Champs-Élysées' AS store_name,\n",
" '75 Av. des Champs-Élysées, 75008 Paris' AS store_address,\n",
"\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.302960997513936, 48.871015939679289)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(2.302960997513936, 48.871015939679289)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.302960997513936, 48.871015939679289)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(2.302960997513936, 48.871015939679289)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
"\n",
" FROM raw_data \n",
"\n",
" */\n",
")\n",
"\n",
", results AS (\n",
"-- This tests to see if they were ouside the circle and then inside\n",
"-- We only want to send them one notification when they break the geofence\n",
"-- This uses the prior lat/long and compares with the current lat/long\n",
"-- Ideally we would say \"NOT EXISTS\" in our customer_geo_location_results table\n",
" SELECT *,\n",
" CASE WHEN prior_distance_to_store_meters > 1000 AND current_distance_to_store_meters <= 1000\n",
" THEN TRUE\n",
" ELSE FALSE\n",
" END AS entered_geofence\n",
" FROM geo_data\n",
")\n",
"\n",
", marketing_message AS (\n",
" -- Call Gemini to generate a marketing message\n",
" SELECT *\n",
" FROM ML.GENERATE_TEXT(MODEL`${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model`,\n",
" (SELECT customer_geo_location_id,\n",
" customer_id,\n",
" current_latitude,\n",
" current_longitude,\n",
" prior_distance_to_store_kilometers,\n",
" current_distance_to_store_kilometers,\n",
" store_id,\n",
" store_name,\n",
" debug_map_url,\n",
" entered_geofence,\n",
" event_timestamp_millis,\n",
" CONCAT(\"Create a marketing message for a user who has just entered a geofencing boundy for the company Chocolate AI.\\n\",\n",
" \"The customer is \", current_distance_to_store_kilometers, \" kilometers away from our \", store_name, \" store.\\n\",\n",
" \"Make the message catchy and convience them it is worth the walk.\\n\",\n",
" \"Tell them to use coupon code 'CLOSE-BY' to get 25% off and a free piece of chocolate.\"\n",
" ) AS prompt\n",
" FROM results),\n",
" STRUCT(.8 AS temperature, .8 AS top_p)\n",
" ) \n",
")\n",
"\n",
"SELECT customer_geo_location_id,\n",
" customer_id,\n",
" `${project_id}.${bigquery_chocolate_ai_dataset}.gemini_model_result_as_string`(ml_generate_text_result) AS genai_message,\n",
" current_latitude,\n",
" current_longitude,\n",
" prior_distance_to_store_kilometers,\n",
" current_distance_to_store_kilometers,\n",
" store_id,\n",
" store_name,\n",
" debug_map_url,\n",
" event_timestamp_millis,\n",
" CURRENT_TIMESTAMP() AS geo_boundry_entry_timestamp\n",
" FROM marketing_message\n",
" WHERE entered_geofence = true;\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "hk41BjwKZyIp"
},
"source": [
"### <font color='#4285f4'>Start a thread and create many people walking</font> (this will generate lots of data)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "e7cqpG9bONaL"
},
"outputs": [],
"source": [
"# This will start 100 threads in this notebook (which is not ideal to run threads in a notebook, but this is for demo purposes)\n",
"# The threads might overwhelm our Apache Kafka cluster based upon the number of people you simulate.\n",
"# You would need to increase the Apache Kafka cluster size to handle lots of messages. Also, create a large notebook runtime in Colab Enterprise to handle more threads.\n",
"# If you stop this cell, the threads will continue to run. The output will show in other cells if you print within the thread.\n",
"\n",
"# While this cell is running, execute the queries in the several cells to monitor the progress of the jobs\n",
"# This cell might output errors if it gets overwhelmed with all the threads running. In real life these would all be seperate clients.\n",
"\n",
"number_of_customers = 100\n",
"\n",
"user_input = input(f\"Do you want to simulate {number_of_customers} people waling using the Kafka Producer (Y/n)?\")\n",
"if user_input == \"Y\":\n",
" # Create some people and send using Open Source Producer\n",
" starting_customer_id = GetMaxNextValue(\"chocolate_ai.customer_geo_location\", \"customer_id\")\n",
" create_people(starting_customer_id, number_of_customers, simulate_walk_open_source_kafka_producer)\n",
"\n",
"# Commented out (so you do not run all the threads for both providers at the same time, you will get conflicts of customer ids)\n",
"# user_input = input(f\"Do you want to simulate {number_of_customers} people waling using the Confluent Producer (Y/n)?\")\n",
"# if user_input == \"Y\":\n",
"# # Create some people and send using Confluent Kafka Producer\n",
"# starting_customer_id = GetMaxNextValue(\"chocolate_ai.customer_geo_location\", \"customer_id\")\n",
"# create_people(starting_customer_id, number_of_customers, simulate_walk_confluent_kafka_producer)\n",
"\n",
"# NOTES:\n",
"# 1. If you are running this for a long time, you should right click this cell and clear the output.\n",
"# 2. The people are walking straght towards the destination, they will walk over water and through buildings.\n",
"# Google Maps routing could be called for a realistic walking route."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "POMkTfL1-8pY"
},
"source": [
"#### Copy these to BigQuery and run outside the notebook while the \"people\" threads are running"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "_wS1pzegZhuF"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"-- Show the most recent records inserted into our BigQuery table from Apache Kafka\n",
"SELECT * FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location` ORDER BY event_timestamp_millis LIMIT 100;"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ZzUojEJfZj3C"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"-- The number of events per second (add day if you are running accross days)\n",
"SELECT CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS DATE) AS Date,\n",
" EXTRACT(HOUR FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Hour,\n",
" EXTRACT(MINUTE FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Minute,\n",
" EXTRACT(SECOND FROM TIMESTAMP_MILLIS(event_timestamp_millis)) AS Second,\n",
" COUNT(*) AS Cnt\n",
" FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`\n",
"GROUP BY ALL\n",
"ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC\n",
"LIMIT 100;"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "kvKsSGLlZd8t"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"-- This will show each customer and their proximity to each store\n",
"\n",
"-- Algorithm:\n",
"-- Get the data from the customer_geo_location table for the current date\n",
"-- Gather our stores so we can compare each customer to the distance to each\n",
"-- Calculate the distance the customer is from the store\n",
"-- Calculate their prior distance using the prior lat/long (in the future we could use the LAG function)\n",
"-- Calculate their current distance using their current lat/long\n",
"-- Determine if the prior distance was greater 1 km and if their current distance is less than (or equal) to 1 km. If true, then they entered the geo-boundry.\n",
"-- Select the final results\n",
"-- Select when cross geo-bountry is true\n",
"-- Select the current distance from each store (we will get 4 records for each customer and 1 for when the cross the boundry)\n",
"\n",
"WITH raw_data AS (\n",
" SELECT *,\n",
" ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY event_timestamp_millis DESC) AS ranking\n",
" FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location`\n",
" WHERE CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS DATE) = CURRENT_DATE()\n",
")\n",
", store_data AS (\n",
" SELECT * FROM `chocolate_ai.store`\n",
")\n",
", geo_data AS (\n",
" SELECT *,\n",
" -- Using the prior lat/long see if they were outside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)\n",
" ) AS prior_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(prior_longitude, prior_latitude),\n",
" ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)\n",
" ) / 1000 AS prior_distance_to_store_kilometers,\n",
"\n",
" -- Using the current lat/long see if they are inside the geofence\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)\n",
" ) AS current_distance_to_store_meters,\n",
"\n",
" ST_DISTANCE(ST_GEOGPOINT(current_longitude, current_latitude),\n",
" ST_GEOGPOINT(store_data.store_longitude, store_data.store_latitude)\n",
" ) / 1000 AS current_distance_to_store_kilometers\n",
"\n",
" FROM raw_data\n",
" CROSS JOIN store_data\n",
")\n",
", results AS (\n",
"-- This tests to see if they were outside the circle and then inside\n",
"-- We only want to send them one notification when they break the geofence\n",
"-- This uses the prior lat/long and compares with the current lat/long\n",
"-- Ideally we would say \"NOT EXISTS\" in our customer_geo_location_results table\n",
" SELECT *,\n",
" CASE WHEN prior_distance_to_store_meters > 1000 AND current_distance_to_store_meters <= 1000\n",
" THEN TRUE\n",
" ELSE FALSE\n",
" END AS entered_geofence\n",
" FROM geo_data\n",
")\n",
"SELECT customer_geo_location_id,\n",
" entered_geofence,\n",
" customer_id,\n",
" current_latitude,\n",
" current_longitude,\n",
" prior_distance_to_store_kilometers,\n",
" current_distance_to_store_kilometers,\n",
" store_id,\n",
" store_name,\n",
" debug_map_url,\n",
" TO_JSON(STRUCT(CAST(TIMESTAMP_MILLIS(event_timestamp_millis) AS STRING) AS event_timestamp)) AS _ATTRIBUTES\n",
" FROM results\n",
" WHERE (ranking = 1 OR entered_geofence = TRUE)\n",
" ORDER BY customer_id, store_id;"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "zNszyLSY943q"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"-- See the processed records by the continuous query (we insert into a BigQuery table: customer_geo_location_results)\n",
"SELECT customer_geo_location_results.*,\n",
" FROM `${project_id}.${bigquery_chocolate_ai_dataset}.customer_geo_location_results` AS customer_geo_location_results\n",
" ORDER BY geo_boundry_entry_timestamp DESC;\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "v66lmg-H-BxI"
},
"outputs": [],
"source": [
"# See the records in Pub/Sub\n",
"# Create a new subscription to see the records\n",
"print(f\"https://console.cloud.google.com/cloudpubsub/subscription/list?project={project_id}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "UmrudJXifsbq"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"-- Want to know how many slots the continuous query is using?\n",
"\n",
"-- Get the number or slots the continuous query is using (good for estimating your workload)\n",
"-- Review the field: job_avg_slots which will be about 1 to 3\n",
"\n",
"-- USER ACTION TO DO: Change the below job_id\n",
"-- 1. Go to your Continuous query window\n",
"-- 2. Click on \"Job Information\" tab in the bottom panel\n",
"-- 3. Copy the last part of the \"Job ID\"\n",
"SELECT job.creation_time,\n",
" job.project_id,\n",
" job.project_number,\n",
" job.user_email,\n",
" job.job_id,\n",
" job.job_type,\n",
" job.statement_type,\n",
" job.priority,\n",
" job.start_time,\n",
" job.end_time,\n",
" job.query,\n",
" job.state,\n",
" job.reservation_id,\n",
" job.total_bytes_processed,\n",
" job.total_slot_ms,\n",
" job.error_result.reason AS error_result_reason,\n",
" job.error_result.location AS error_result_location,\n",
" job.error_result.debug_info AS error_result_debug_info,\n",
" job.error_result.message AS error_result_message,\n",
"\n",
" -- Average slot utilization per job is calculated by dividing\n",
" -- total_slot_ms by the millisecond duration of the job\n",
" CAST(SAFE_DIVIDE(job.total_slot_ms,(TIMESTAMP_DIFF(IFNULL(job.end_time,CURRENT_TIMESTAMP()), job.start_time, MILLISECOND))) AS FLOAT64) AS job_avg_slots\n",
"\n",
" FROM `${project_id}`.`region-us`.INFORMATION_SCHEMA.JOBS AS job\n",
" CROSS JOIN UNNEST(job.job_stages) as unnest_job_stages\n",
" CROSS JOIN UNNEST(job.timeline) AS unnest_timeline\n",
" WHERE job.job_id = 'bquxjob_544ac149_191ec5b33ed'\n",
"GROUP BY ALL;"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "iOdfoK4abe5G"
},
"source": [
"### <font color='#4285f4'>Kafka (Open Source and Confluent) Consumers</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "qL9rJHMx_lGj"
},
"source": [
"This sample code on how to consume the messaging using the open source and Confluent Python libraries. It shows you auth and how to read the messages."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "39tMb3BRbDB9"
},
"source": [
"#### <font color='#4285f4'>Open Source Kafka Consumer</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "vfrUFAPSbJZt"
},
"outputs": [],
"source": [
"def openSourceKafkaConsumer():\n",
" from kafka import KafkaConsumer\n",
"\n",
" # Kafka Consumer configuration with SASL_PLAIN authentication\n",
" # This requires a service principal key (json file) which must be base64 encoded\n",
" # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket\n",
" # secret = !(cat sa.key.json | base64 -w 0)\n",
" # secret = secret[0]\n",
" #config = {\n",
" # 'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',\n",
" # 'security_protocol': 'SASL_SSL', # Use SASL_PLAINTEXT for username/password\n",
" # 'sasl_mechanism': 'PLAIN',\n",
" # 'sasl_plain_username': f'kafka-sp@{project_id}.iam.gserviceaccount.com',\n",
" # 'sasl_plain_password': secret,\n",
" # 'group_id': 'kafka-group-id',\n",
" # 'auto_offset_reset': 'earliest'\n",
" #}\n",
"\n",
" # Kafka Consumer configuration with OAUTHBEARER authentication\n",
" config = {\n",
" 'bootstrap_servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',\n",
" 'security_protocol': 'SASL_SSL',\n",
" 'sasl_mechanism': 'OAUTHBEARER',\n",
" 'sasl_oauth_token_provider': TokenProvider(),\n",
" 'group_id': 'kafka-group-id',\n",
" 'auto_offset_reset': 'earliest'\n",
" }\n",
"\n",
" # Create Consumer instance\n",
" consumer = KafkaConsumer(**config) # Use keyword unpacking for clear configuration\n",
"\n",
" # Subscribe to topic\n",
" consumer.subscribe([kafka_topic_name])\n",
"\n",
" i = 0\n",
" max_items = 50\n",
"\n",
" # Poll for new messages from Kafka and print them.\n",
" try:\n",
" while True:\n",
" messages = consumer.poll(1.0)\n",
" for partition, messages in messages.items():\n",
" for message in messages:\n",
" i += 1\n",
" if i >= max_items:\n",
" print(f\"Reached max items ({max_items})\")\n",
" break\n",
" try:\n",
" print(f\"Consumed record with key {message.key} and value {message.value}\")\n",
" # Process the message here (e.g., parse JSON, store data)\n",
" message_data = json.loads(message.value)\n",
" print(message_data)\n",
" except Exception as e:\n",
" print(f\"Error processing message: {e}\")\n",
" if i >= max_items:\n",
" break\n",
" if i >= max_items:\n",
" break\n",
"\n",
" except KeyboardInterrupt:\n",
" pass\n",
" finally:\n",
" # Leave group and commit final offsets\n",
" consumer.close()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "cnq1zz97cyg3"
},
"outputs": [],
"source": [
"openSourceKafkaConsumer()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "aRxIHtJNZLoZ"
},
"source": [
"#### <font color='#4285f4'>Confluent Source Kafka Consumer</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "oWrn9XYWZNwK"
},
"outputs": [],
"source": [
"def confluentKafkaConsumer():\n",
" from confluent_kafka import Consumer\n",
" import functools\n",
"\n",
" # Kafka Consumer configuration with SASL_PLAIN authentication\n",
" # This requires a service principal key (json file) which must be base64 encoded\n",
" # !gsutil cp gs://bucket-name/your-key.json sa.key.json <- This assumes you exported the key to a bucket\n",
" # secret = !(cat sa.key.json | base64 -w 0)\n",
" # secret = secret[0]\n",
" #config = {\n",
" # # User-specific properties that you must set\n",
" # 'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',\n",
" # 'sasl.username': f'kafka-sp@{project_id}.iam.gserviceaccount.com',\n",
" # 'sasl.password': secret,#\n",
" # 'security.protocol': 'SASL_SSL',\n",
" # 'sasl.mechanisms': 'PLAIN',\n",
" # 'group.id': 'kafka-group-id',\n",
" # 'auto.offset.reset': 'earliest'\n",
" #}\n",
"\n",
" # Kafka Consumer configuration with OAUTHBEARER authentication\n",
" config = {\n",
" 'bootstrap.servers': f'bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog',\n",
" 'security.protocol': 'SASL_SSL',\n",
" 'sasl.mechanisms': 'OAUTHBEARER',\n",
" 'oauth_cb': functools.partial(ConfluentTokenProvider, None),\n",
" 'error_cb' : functools.partial(ConfluentErrorProvider),\n",
" 'group.id': 'kafka-group-id',\n",
" 'auto.offset.reset': 'earliest'\n",
" }\n",
"\n",
" # Create Consumer instance\n",
" consumer = Consumer(config)\n",
"\n",
" # Subscribe to topic\n",
" consumer.subscribe([kafka_topic_name])\n",
"\n",
" i = 0\n",
" max_items = 50\n",
"\n",
" # Poll for new messages from Kafka and print them.\n",
" try:\n",
" while True:\n",
" msg = consumer.poll(1.0)\n",
" if msg is None:\n",
" # Initial message consumption may take up to\n",
" # `session.timeout.ms` for the consumer group to\n",
" # rebalance and start consuming\n",
" print(\"Waiting...\")\n",
" elif msg.error():\n",
" print(\"ERROR: %s\".format(msg.error()))\n",
" else:\n",
" # Extract the (optional) key and value, and print.\n",
" i += 1\n",
" print(msg.value().decode('utf-8'))\n",
" if i >= max_items:\n",
" print(f\"Reached max items ({max_items})\")\n",
" break\n",
" except KeyboardInterrupt:\n",
" pass\n",
" finally:\n",
" # Leave group and commit final offsets\n",
" consumer.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "jYlnqb3hZQRX"
},
"outputs": [],
"source": [
"confluentKafkaConsumer()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "42IxhtRRrvR-"
},
"source": [
"### <font color='#4285f4'>Clean Up</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "6lF2Z7skFbvf"
},
"outputs": [],
"source": [
"# Note if you do not know your job id, or overwrote the value, click here to open and manually Cancel the job\n",
"# https://console.cloud.google.com/dataflow/jobs\n",
"\n",
"user_input = input(f\"Do you want to delete your DataFlow Job {jobName} (Y/n)?\")\n",
"if user_input == \"Y\":\n",
" stopDataflowJobApacheKafkaToBigQuery(jobName)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Qj52V67PrxiX"
},
"outputs": [],
"source": [
"user_input = input(\"Do you want to delete your Apache Kafka for BigQuery (Y/n)?\")\n",
"if user_input == \"Y\":\n",
" deleteApacheKafkaForBigQueryCluster()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "nejUFP7lBOp0"
},
"outputs": [],
"source": [
"user_input = input(\"Do you want to delete your BigQuery Reservations. This will STOP billing! (Y/n)?\")\n",
"if user_input == \"Y\":\n",
" sql = f\"DROP ASSIGNMENT `{project_id}.region-{bigquery_location}.continuous-query-reservation.continuous-query-reservation-assignment`;\"\n",
" RunQuery(sql)\n",
" sql = f\"DROP RESERVATION `{project_id}.region-{bigquery_location}.continuous-query-reservation`;\"\n",
" RunQuery(sql)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ASQ2BPisXDA0"
},
"source": [
"### <font color='#4285f4'>Reference Links</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "4uuW4Aw27Xnt"
},
"source": [
"- [Python Kafka Libary](https://kafka-python.readthedocs.io/en/master/index.html)\n",
"- [Confluent Python Library](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html)\n",
"- [Google Apache Kafka for BigQuery - Authentication](https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer)\n",
"- [Google Apache Kafka for BigQuery - OAuth Sample Code](https://cloud.google.com/managed-kafka/docs/authentication-kafka#oauthbearer)\n",
"* [Google Apache Kafka for BigQuery - Sample Code](https://github.com/googleapis/managedkafka)\n",
"- [Confluent - Sample OAuth Code](https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py)\n",
"* [Confluent - Kafka Config for Python](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration)"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [
"HMsUvoF4BP7Y",
"m65vp54BUFRi",
"UmyL-Rg4Dr_f",
"sZ6m_wGrK0YG",
"JbOjdSP1kN9T",
"lkj0u39akoZu",
"fo38SP8YuFdb",
"WUwyHLMhuYws",
"YCX1j-VKuecW",
"i9fQa--IBhKb",
"A84k91wbujTD",
"GNs0zbGokXNQ",
"XiketZsXkdRe",
"TRt5FC6XDtiw",
"6TUUHdtlFxnQ",
"k9p6kRH4F6vv",
"IrARsGGPpbsc",
"lacWbLDHGCL4",
"ftO94aBR-t1O",
"v_x4X8RSNMJt",
"TP8XKiCRfa65",
"ITnZPgO_b9m2",
"etIgADeob6FF",
"nm_hoP6Xb_AT",
"8dhBImkEb0q9",
"pYQO1IKpXwyY",
"M5QKc03pC4LP",
"QassjFmybtao",
"7gtEd0M7rkfc",
"s1NflkRlaebN",
"V5LcAR7xZkK8",
"6p9BOzi6hFFg",
"AgTNR60_A4Aj",
"iOdfoK4abe5G",
"39tMb3BRbDB9",
"aRxIHtJNZLoZ",
"42IxhtRRrvR-",
"ASQ2BPisXDA0"
],
"name": "Campaign-Performance-Geofencing-Simulation",
"private_outputs": true,
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}