colab-enterprise/BigLake-Demo.ipynb (4,387 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "cxGjNPO6fdi9" }, "source": [ "# <img src=\"https://lh3.googleusercontent.com/mUTbNK32c_DTSNrhqETT5aQJYFKok2HB1G2nk2MZHvG5bSs0v_lmDm_ArW7rgd6SDGHXo0Ak2uFFU96X6Xd0GQ=w160-h128\" width=\"45\" valign=\"top\" alt=\"BigQuery\"> BigLake Demo" ] }, { "cell_type": "markdown", "metadata": { "id": "dRw4v7zx1AtJ" }, "source": [ "## <font color='blue'>License</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "QKD7uKJb1EVD" }, "source": [ "```\n", "# Copyright 2024 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "```\n", "\n", "Author: Adam Paternostro" ] }, { "cell_type": "markdown", "metadata": { "id": "LfKtshOJvrxH" }, "source": [ "## <font color='blue'>Overview -</font> Readme\n" ] }, { "cell_type": "markdown", "metadata": { "id": "ldOKbXOZiOhD" }, "source": [ "\n", "\n", "* Self Link: https://github.com/GoogleCloudPlatform/data-analytics-golden-demo/blob/main/colab-enterprise/biglake/BigLake-Demo.ipynb\n", "* This notebook will:\n", " - Create a storage account name: biglake-{your project id}\n", " - Create external BigQuery connections for BigLake, Vertex AI\n", " - Create BigLake Managed Tables (BLMT)\n", " - Loads the tables\n", " - Show streaming ingestion via BigQuery Subscription\n", " - Iceberg Metadata export\n", " - Create BigLake Self-Managed (external Tables) in lots of formats\n", " - Shows Row, Column and Data Masking on BigLake tables\n", " - Create a BigLake Metastore using a serverless Spark Stored procedure. Permissions set with working Spark / Iceberg code.\n", " - BigLake Materialized Views\n", " - BigLake Object Tables\n", " - Image Table\n", " - Vertex AI image processing\n", " - Gemini Pro to generate natural language\n", " - Vector Embeddings\n", " - Semantic Search\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "esyact5NoYA9" }, "source": [ "## <font color='gray'>Set Notebook Parameters</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "hQ8mO8H1hFxX" }, "outputs": [], "source": [ "# Set these (run this cell to verify the output)\n", "bigquery_location = \"us\" # or \"eu\"\n", "\n", "# Get some values using gcloud\n", "project_id = !(gcloud config get-value project)\n", "user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n", "\n", "if len(project_id) != 1:\n", " raise RuntimeError(f\"project_id is not set: {project_id}\")\n", "project_id = project_id[0]\n", "\n", "if len(user) != 1:\n", " raise RuntimeError(f\"user is not set: {user}\")\n", "user = user[0]\n", "\n", "print(f\"project_id = {project_id}\")\n", "print(f\"user = {user}\")\n", "\n", "# Derived parameters\n", "biglake_bucket_name = \"biglake-\" + project_id\n", "biglake_connection_name = \"biglake-notebook-connection\"\n", "spark_connection_name = \"spark-notebook-connection\"\n", "taxonomy_name = project_id.lower()\n", "vertex_ai_connection_name = \"vertex-ai-notebook-connection\"\n", "\n", "params = { \"project_id\" : project_id,\n", " \"bigquery_location\" : bigquery_location,\n", " \"biglake_connection_name\": biglake_connection_name,\n", " \"biglake_bucket_name\" : biglake_bucket_name,\n", " \"user\" : user,\n", " \"taxonomy_name\" : taxonomy_name,\n", " \"spark_connection_name\" : spark_connection_name,\n", " \"vertex_ai_connection_name\" : vertex_ai_connection_name\n", " }" ] }, { "cell_type": "markdown", "metadata": { "id": "w_Mty4lcHqWY" }, "source": [ "## <font color='gray'>Helper Methods</font>\n", "Creates BigLake connection, GCS bucket, set IAM permissions" ] }, { "cell_type": "markdown", "metadata": { "id": "krmZ78BCsHoK" }, "source": [ "#### restAPIHelper\n", "Calls the Google Cloud REST API using the current users credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NMrWe4NbhdyJ" }, "outputs": [], "source": [ "def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n", " \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n", "\n", " import requests\n", " import google.auth\n", " import json\n", "\n", " # Get an access token based upon the current user\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request()\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " if http_verb == \"GET\":\n", " response = requests.get(url, headers=headers)\n", " elif http_verb == \"POST\":\n", " response = requests.post(url, json=request_body, headers=headers)\n", " elif http_verb == \"PUT\":\n", " response = requests.put(url, json=request_body, headers=headers)\n", " elif http_verb == \"PATCH\":\n", " response = requests.patch(url, json=request_body, headers=headers)\n", " elif http_verb == \"DELETE\":\n", " response = requests.delete(url, headers=headers)\n", " else:\n", " raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n", "\n", " if response.status_code == 200:\n", " return json.loads(response.content)\n", " #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n", " else:\n", " error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n", " raise RuntimeError(error)" ] }, { "cell_type": "markdown", "metadata": { "id": "N0LGUyIvsTdc" }, "source": [ "#### createBigLakeConnection\n", "Creates the BigQuery external connection and returns the generated service principal. The service principal then needs to be granted IAM access to resourses it requires." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PtFtN9z5fdDU" }, "outputs": [], "source": [ "def createBigLakeConnection(params):\n", " \"\"\"Creates a BigLake connection.\"\"\"\n", "\n", " # First find the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", " biglake_connection_name = params[\"biglake_connection_name\"]\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections\"\n", "\n", " # Gather existing connections\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createBigLakeConnection (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"connections\" in json_result:\n", " for item in json_result[\"connections\"]:\n", " print(f\"BigLake Connection: {item['name']}\")\n", " # \"projects/756740881369/locations/us/connections/biglake-notebook-connection\"\n", " # NOTE: We cannot test the complete name since it contains the project number and not id\n", " if item[\"name\"].endswith(f\"/locations/{bigquery_location}/connections/{biglake_connection_name}\"):\n", " print(\"Connection already exists\")\n", " serviceAccountId = item[\"cloudResource\"][\"serviceAccountId\"]\n", " return serviceAccountId\n", "\n", " # Create the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create\n", " print(\"Creating BigLake Connection\")\n", "\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={biglake_connection_name}\"\n", "\n", " request_body = {\n", " \"friendlyName\": biglake_connection_name,\n", " \"description\": \"BigLake Colab Notebooks Connection for Data Analytics Golden Demo\",\n", " \"cloudResource\": {}\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " serviceAccountId = json_result[\"cloudResource\"][\"serviceAccountId\"]\n", " print(\"BigLake Connection created: \", serviceAccountId)\n", " return serviceAccountId\n" ] }, { "cell_type": "markdown", "metadata": { "id": "1RVMxCRvsWp2" }, "source": [ "#### createGoogleCloudStorageBucket\n", "Create the Google Cloud Storage bucket that will be used for holding the BigLake files (avro, csv, delta, hudi, json, parquet)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "OyX5xR-GthM-" }, "outputs": [], "source": [ "def createGoogleCloudStorageBucket(params):\n", " \"\"\"Creates a Google Cloud Storage bucket.\"\"\"\n", "\n", " # First find the bucket\n", " # https://cloud.google.com/storage/docs/json_api/v1/buckets/list\n", " project_id = params[\"project_id\"]\n", " biglake_bucket_name = params[\"biglake_bucket_name\"]\n", " url = f\"https://storage.googleapis.com/storage/v1/b?project={project_id}\"\n", "\n", " # Gather existing buckets\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createGoogleCloudStorageBucket (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"items\" in json_result:\n", " for item in json_result[\"items\"]:\n", " print(f\"Bucket Id / Name: ({item['id']} / {item['name']}\")\n", " if item[\"id\"] == biglake_bucket_name:\n", " print(\"Bucket already exists\")\n", " return\n", "\n", " # Create the bucket\n", " # https://cloud.google.com/storage/docs/json_api/v1/buckets/insert\n", " print(\"Creating Google Cloud Bucket\")\n", "\n", " url = f\"https://storage.googleapis.com/storage/v1/b?project={project_id}&predefinedAcl=private&predefinedDefaultObjectAcl=private&projection=noAcl\"\n", "\n", " request_body = {\n", " \"name\": biglake_bucket_name\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print()\n", " print(f\"json_result: {json_result}\")\n", " print()\n", " print(\"BigLake Bucket created: \", biglake_bucket_name)" ] }, { "cell_type": "markdown", "metadata": { "id": "Mrz1_eXFsa6t" }, "source": [ "#### setBucketIamPolicy\n", "Added the BigLake External Connection Service Principal to the IAM permission of the GCS bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Ffuf02I28KWC" }, "outputs": [], "source": [ "def setBucketIamPolicy(params, accountWithPrefix, role):\n", " \"\"\"Sets the bucket IAM policy.\"\"\"\n", "\n", " biglake_bucket_name = params[\"biglake_bucket_name\"]\n", "\n", " # Get the current bindings (if the account has access then skip)\n", " # https://cloud.google.com/storage/docs/json_api/v1/buckets/getIamPolicy\n", "\n", " url = f\"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/iam\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"setBucketIamPolicy (GET) json_result: {json_result}\")\n", "\n", " # Test to see if permissions exist\n", " if \"bindings\" in json_result:\n", " for item in json_result[\"bindings\"]:\n", " members = item[\"members\"]\n", " for member in members:\n", " if member == accountWithPrefix:\n", " existing_role = item[\"role\"]\n", " if existing_role == role:\n", " print(f\"Permissions exist: {existing_role}\")\n", " return\n", "\n", " # Take the existing bindings and we need to append the new permission\n", " # Otherwise we loose the existing permissions\n", "\n", " bindings = json_result[\"bindings\"]\n", " new_permission = {\n", " \"role\": role,\n", " \"members\": [ accountWithPrefix ]\n", " }\n", "\n", " bindings.append(new_permission)\n", "\n", " # https://cloud.google.com/storage/docs/json_api/v1/buckets/setIamPolicy\n", " url = f\"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/iam\"\n", "\n", " request_body = { \"bindings\" : bindings }\n", "\n", " print(f\"Permission bindings: {bindings}\")\n", "\n", "\n", " json_result = restAPIHelper(url, \"PUT\", request_body)\n", " print()\n", " print(f\"json_result: {json_result}\")\n", " print()\n", " print(f\"Bucket IAM Permissions set for {accountWithPrefix} {role}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "TzC5lSWHsfJe" }, "source": [ "#### bucketFileExists\n", "Method that tests for a file in the bucket and if the file does not exist then a copy from the public storage account is performed." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "3cMv5zJSLE4R" }, "outputs": [], "source": [ "def bucketFileExists(params):\n", " \"\"\"Test to see if data has been copied\"\"\"\n", " import urllib.parse\n", "\n", " # First find the connection\n", " # https://cloud.google.com/storage/docs/json_api/v1/objects/get\n", " project_id = params[\"project_id\"]\n", " biglake_bucket_name = params[\"biglake_bucket_name\"]\n", " file_to_test = \"biglake-tables/driver_parquet/driver.snappy.parquet\"\n", " file_to_test_encoded = urllib.parse.quote(file_to_test, safe='')\n", " url = f\"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/o/{file_to_test_encoded}\"\n", "\n", " # Gather existing connections\n", " try:\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"bucketFileExists (GET) json_result: {json_result}\")\n", " return True\n", " except:\n", " return False" ] }, { "cell_type": "markdown", "metadata": { "id": "ETek1S3-hjxq" }, "source": [ "#### updateHudiManifest\n", "We need to replace the REPLACE_ME path in the Hudi Manifest file. Open the copied manifest, replace each line and then save back to storage." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CxknvRWAen3d" }, "outputs": [], "source": [ "def updateHudiManifest(params):\n", " import io\n", " import os\n", "\n", " from google.cloud import storage\n", "\n", " # Create a storage client\n", " storage_client = storage.Client()\n", "\n", " # Get a reference to the bucket\n", " bucket = storage_client.bucket(biglake_bucket_name)\n", " blob_name = \"biglake-tables/location_hudi/.hoodie/absolute-path-manifest/latest-snapshot.csv\"\n", "\n", " blob = bucket.blob(blob_name)\n", " blob.download_to_filename(\"latest-snapshot.csv\")\n", "\n", " with open('latest-snapshot.csv', 'r') as f:\n", " manifest_lines = f.readlines()\n", "\n", " new_lines = []\n", " for line in manifest_lines:\n", " new_lines.append(line.replace(\"REPLACE-ME\",biglake_bucket_name))\n", "\n", " with open('latest-snapshot.csv', 'w') as f:\n", " f.writelines(new_lines)\n", "\n", " # Upload the file from the local filesystem\n", " content_type = \"text/csv\"\n", " blob.upload_from_filename('latest-snapshot.csv', content_type = content_type)\n" ] }, { "cell_type": "markdown", "metadata": { "id": "yfW_BQWx7MDJ" }, "source": [ "#### createServiceAccount\n", "Creates a service account if it does not exist" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "KNCs6Zcc7Mzm" }, "outputs": [], "source": [ "def createServiceAccount(params, serviceAccountName, description, displayName):\n", " \"\"\"Creates a Service Account.\"\"\"\n", "\n", " # First find the service account\n", " # https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/list\n", " project_id = params[\"project_id\"]\n", " url = f\"https://iam.googleapis.com/v1/projects/{project_id}/serviceAccounts\"\n", "\n", " serviceAccountEmail = f\"{serviceAccountName}@{project_id}.iam.gserviceaccount.com\"\n", "\n", " # Gather existing service accounts\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createServiceAccount (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"accounts\" in json_result:\n", " for item in json_result[\"accounts\"]:\n", " print(f\"email: {item['email']}\")\n", " if item[\"email\"] == serviceAccountEmail:\n", " print(\"Service Account already exists\")\n", " return serviceAccountEmail\n", "\n", " # Create the service account\n", " # https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/create\n", " print(\"Creating Service Account\")\n", "\n", " url = f\"https://iam.googleapis.com/v1/projects/{project_id}/serviceAccounts\"\n", "\n", " request_body = {\n", " \"accountId\" : serviceAccountName,\n", " \"serviceAccount\":{\n", " \"description\": description,\n", " \"displayName\": displayName\n", " }\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " email = json_result[\"email\"]\n", " print(\"Service Account created: \", email)\n", " return email" ] }, { "cell_type": "markdown", "metadata": { "id": "HEiOCWFfYnzO" }, "source": [ "#### getProjectNumber\n", "Gets the project number from a project id" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gbK_-Ni_YvjE" }, "outputs": [], "source": [ "def getProjectNumber(params):\n", " \"\"\"Batch activates service apis\"\"\"\n", "\n", " if \"project_number\" not in params:\n", " # https://cloud.google.com/resource-manager/reference/rest/v1/projects/get?\n", " project_id = params[\"project_id\"]\n", "\n", " url = f\"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"setBucketIamPolicy (GET) json_result: {json_result}\")\n", "\n", " project_number = json_result[\"projectNumber\"]\n", " params[\"project_number\"] = project_number\n", " print(f\"getProjectNumber: {project_number}\")\n", "\n", " else:\n", " project_number = params[\"project_number\"]\n", " print(f\"getProjectNumber: {project_number}\")\n", " return project_number" ] }, { "cell_type": "markdown", "metadata": { "id": "7GMwE_UW6A9w" }, "source": [ "#### activateServiceAPIs\n", "Enables Google Cloud APIs" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "o8YiYxZp6CH5" }, "outputs": [], "source": [ "def activateServiceAPIs(params):\n", " \"\"\"Batch activates service apis\"\"\"\n", "\n", " project_number = params[\"project_number\"]\n", "\n", " request_body = {\n", " \"serviceIds\" : [ \"pubsub.googleapis.com\", \"vision.googleapis.com\", \"biglake.googleapis.com\", \"dataproc.googleapis.com\", \"aiplatform.googleapis.com\"]\n", " }\n", "\n", " url = f\"https://serviceusage.googleapis.com/v1/projects/{project_number}/services:batchEnable\"\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print(f\"activateServiceAPIs (POST) json_result: {json_result}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "IJLvvOiMxggM" }, "source": [ "#### downloadGCSFile\n", "Downloads a file from GCS to the notebook compute instance" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gqdW84VPxgu8" }, "outputs": [], "source": [ "def downloadGCSFile(uri):\n", " from google.cloud import storage\n", "\n", " bucket_name = uri[5:uri.replace(\"gs://\",\"\").index(\"/\")+5]\n", " file_path = uri.replace(\"gs://\" + bucket_name,\"\")[1::]\n", " filename = file_path[len(file_path) - file_path[::-1].index(\"/\"):]\n", "\n", " # print(f\"bucket_name: {bucket_name}\")\n", " # print(f\"file_path: {file_path}\")\n", " # print(f\"filename: {filename}\")\n", "\n", " storage_client = storage.Client()\n", " bucket = storage_client.bucket(bucket_name)\n", " blob = bucket.blob(file_path)\n", " blob.download_to_filename(filename)\n", "\n", " return filename" ] }, { "cell_type": "markdown", "metadata": { "id": "gkY0KBSXJ7eo" }, "source": [ "#### getTableSchema\n", "Retrieve a BigQuery table schema as JSON" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "tzyuyff9J9PI" }, "outputs": [], "source": [ "def getTableSchema(project_id, dataset_name, table_name):\n", " import io\n", " import google.cloud.bigquery as bigquery\n", "\n", " client = bigquery.Client()\n", "\n", " dataset_ref = client.dataset(dataset_name, project=project_id)\n", " table_ref = dataset_ref.table(table_name)\n", " table = client.get_table(table_ref)\n", "\n", " f = io.StringIO(\"\")\n", " client.schema_to_json(table.schema, f)\n", " return f.getvalue()" ] }, { "cell_type": "markdown", "metadata": { "id": "kr59LuJVKJlG" }, "source": [ "#### updateTableSchema\n", "Sets the schema for a BigQuery table (CLS , Data Masking)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AHJfTuxxKIOV" }, "outputs": [], "source": [ "def updateTableSchema(project_id, dataset_name, table_name, new_schema):\n", " import io\n", " import google.cloud.bigquery as bigquery\n", "\n", " client = bigquery.Client()\n", "\n", " dataset_ref = client.dataset(dataset_name, project=project_id)\n", " table_ref = dataset_ref.table(table_name)\n", " table = client.get_table(table_ref)\n", "\n", " table.schema = new_schema\n", " table = client.update_table(table, [\"schema\"])\n", "\n", " print(f\"Table {table_name} schema updated!\")" ] }, { "cell_type": "markdown", "metadata": { "id": "xlDVWL00BlpT" }, "source": [ "#### createPubSubSubscription\n", "Creates a Pub/Sub subscription for BQ Subscription" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "JNZl9xeyBnDq" }, "outputs": [], "source": [ "def createPubSubSubscription(params, name, table_name, topic_name):\n", " \"\"\"Creates a Pub/Sub Subscription for BQ Subscription.\"\"\"\n", "\n", " # First find the pub/sub\n", " # https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/list\n", " project_id = params[\"project_id\"]\n", " url = f\"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions\"\n", "\n", " # Gather existing pub/sub\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createPubSubSubscription (GET) json_result: {json_result}\")\n", "\n", " full_name = f\"projects/{project_id}/subscriptions/{name}\"\n", "\n", " # Test to see if connection exists, if so return\n", " if \"subscriptions\" in json_result:\n", " for item in json_result[\"subscriptions\"]:\n", " print(f\"name: {item['name']}\")\n", " if item[\"name\"] == full_name:\n", " print(\"Pub/Sub already exists\")\n", " return full_name\n", "\n", " # Create the service account\n", " # https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create\n", " print(\"Creating Pub/Sub Subscription\")\n", "\n", " url = f\"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/{name}\"\n", "\n", " request_body = {\n", " \"bigqueryConfig\": {\n", " \"table\": table_name,\n", " \"writeMetadata\": True\n", " },\n", " \"topic\": topic_name\n", " }\n", "\n", " json_result = restAPIHelper(url, \"PUT\", request_body)\n", "\n", " full_name = json_result[\"name\"]\n", " print(\"Pub/Sub Subscription created: \", full_name)\n", " return full_name" ] }, { "cell_type": "markdown", "metadata": { "id": "v35ytEL5BpIb" }, "source": [ "#### deletePubSubSubscription\n", "Deletes a Pub/Sub subscription" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rH1j8P1WBq20" }, "outputs": [], "source": [ "def deletePubSubSubscription(params, name):\n", " \"\"\"Deletes a Pub/Sub Subscription for BQ Subscription.\"\"\"\n", "\n", " # First find the pub/sub\n", " # https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/list\n", " project_id = params[\"project_id\"]\n", " url = f\"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions\"\n", "\n", " # Gather existing pub/sub\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"deletePubSubSubscription (GET) json_result: {json_result}\")\n", "\n", " full_name = f\"projects/{project_id}/subscriptions/{name}\"\n", "\n", " # Test to see if connection exists, if so return\n", " if \"subscriptions\" in json_result:\n", " for item in json_result[\"subscriptions\"]:\n", " print(f\"name: {item['name']}\")\n", " if item[\"name\"] == full_name:\n", " # Delete the Pub/Sub\n", " # https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/delete\n", " print(\"Deleteing Pub/Sub Subscription\")\n", " url = f\"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/{name}\"\n", " json_result = restAPIHelper(url, \"DELETE\", None)\n", " print(\"Pub/Sub Subscription delete: \", full_name)\n", " return\n", "\n", " print(\"Pub/Sub Subscription does not exists\")\n", " return" ] }, { "cell_type": "markdown", "metadata": { "id": "qm9PgMEntbAW" }, "source": [ "#### setBigQueryDatasetPolicy\n", "Sets the IAM Permissions on a BigQuery Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "huj0GJLztbAg" }, "outputs": [], "source": [ "def setBigQueryDatasetPolicy(params, dataset_id, account, role):\n", " \"\"\"Sets the BigQuery Dataset IAM policy.\"\"\"\n", "\n", " # Get the current bindings (if the account has access then skip)\n", " # https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/get\n", " project_id = params[\"project_id\"]\n", "\n", " url = f\"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_id}\"\n", "\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"setBigQueryDatasetPolicy (GET) json_result: {json_result}\")\n", "\n", " # Test to see if permissions exist\n", " if \"access\" in json_result:\n", " for item in json_result[\"access\"]:\n", " if \"userByEmail\" in item:\n", " if item[\"userByEmail\"] == account and item[\"role\"] == role:\n", " print(\"Permissions exist\")\n", " return\n", "\n", "\n", " # Take the existing bindings and we need to append the new permission\n", " # Otherwise we loose the existing permissions\n", " if \"access\" in json_result:\n", " access = json_result[\"access\"]\n", " else:\n", " access = []\n", "\n", " new_permission = {\n", " \"role\": role,\n", " \"userByEmail\": account\n", " }\n", "\n", " access.append(new_permission)\n", "\n", " # https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch\n", " url = f\"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_id}\"\n", "\n", " request_body = {\n", " \"access\" : access\n", " }\n", "\n", " print(f\"Permission bindings: {access}\")\n", "\n", " json_result = restAPIHelper(url, \"PATCH\", request_body)\n", " print()\n", " print(f\"json_result: {json_result}\")\n", " print()\n", " print(f\"BigQuery Dataset IAM Permissions set for {account} {role}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "_C29InxBshzt" }, "source": [ "#### runQuery\n", "Executes a BigQuery SQL statement and returns the results for SELECT statements or waits for the job to complete for non-query results." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Zltot1OwBSzB" }, "outputs": [], "source": [ "def runQuery(sql):\n", " import time\n", " import google.cloud.bigquery as bigquery\n", "\n", " client = bigquery.Client()\n", "\n", " if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n", " df_result = client.query(sql).to_dataframe()\n", " return df_result\n", " else:\n", " job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n", " query_job = client.query(sql, job_config=job_config)\n", "\n", " # Check on the progress by getting the job's updated state.\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " while query_job.state != \"DONE\":\n", " if sql.upper().startswith(\"CALL\"):\n", " time.sleep(10)\n", " else:\n", " time.sleep(1)\n", "\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " if query_job.error_result == None:\n", " return True\n", " else:\n", " return False" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### create_custom_storage_viewer_role\n", "Creates a custom Google Cloud IAM role" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_custom_storage_viewer_role(project_id, role_id, role_title, role_description, permissions_to_grant):\n", " \"\"\"Creates a custom Google Cloud IAM role\"\"\"\n", " import json\n", " \n", " #project_id = params[\"project_id\"]\n", " #role_id = params.get(\"role_id\", \"customStorageViewer\") # Use provided or default\n", " #role_title = params.get(\"role_title\", \"Custom Storage Bucket Viewer\")\n", " #role_description = params.get(\"role_description\", \"Grants permission to view storage bucket metadata.\")\n", " #permissions_to_grant = params.get(\"permissions\", [\"storage.buckets.get\"])\n", "\n", " full_role_name = f\"projects/{project_id}/roles/{role_id}\"\n", " print(f\"Checking for custom IAM role: {full_role_name}\")\n", "\n", " # 1. Check if the role already exists\n", " # API Doc: https://cloud.google.com/iam/docs/reference/rest/v1/projects.roles/get\n", " get_url = f\"https://iam.googleapis.com/v1/{full_role_name}\"\n", "\n", " try:\n", " # Attempt to get the role\n", " existing_role = restAPIHelper(get_url, \"GET\", None)\n", "\n", " # Check if the helper returned a valid role (not None or an error structure)\n", " # A real implementation would check the HTTP status code from the helper\n", " if existing_role and 'name' in existing_role:\n", " print(f\"Custom IAM role '{role_id}' already exists in project '{project_id}'.\")\n", " print(f\"Existing role details: {existing_role}\")\n", " return existing_role # Return the existing role details\n", " else:\n", " # If restAPIHelper returned None or an error indicator for 'Not Found',\n", " # we proceed to creation. If it returned something else unexpectedly,\n", " # that indicates a different problem.\n", " print(f\"Role '{role_id}' not found or GET failed, proceeding to create.\")\n", "\n", " except Exception as e:\n", " # Assuming restAPIHelper raises an exception for HTTP errors like 404\n", " # A real implementation should specifically check for a 404 status if possible.\n", " print(f\"Caught exception during GET (assuming role not found): {e}\")\n", " print(f\"Proceeding to create role '{role_id}'.\")\n", " # Pass through to the creation step\n", "\n", " # 2. Create the role if it doesn't exist\n", " # API Doc: https://cloud.google.com/iam/docs/reference/rest/v1/projects.roles/create\n", " print(f\"Creating custom IAM role '{role_id}' in project '{project_id}'...\")\n", "\n", " create_url = f\"https://iam.googleapis.com/v1/projects/{project_id}/roles\"\n", "\n", " request_body = {\n", " \"roleId\": role_id,\n", " \"role\": {\n", " \"title\": role_title,\n", " \"description\": role_description,\n", " \"includedPermissions\": permissions_to_grant,\n", " \"stage\": \"GA\" # Or \"BETA\", \"ALPHA\", \"DEPRECATED\"\n", " }\n", " }\n", "\n", " try:\n", " json_result = restAPIHelper(create_url, \"POST\", request_body)\n", " print(\"\\nCustom IAM Role Creation API call result:\")\n", " print(json.dumps(json_result, indent=2))\n", "\n", " if json_result and 'name' in json_result:\n", " print(f\"\\nSuccessfully created custom IAM role: {json_result.get('name')}\")\n", " return json_result\n", " else:\n", " print(\"\\nRole creation might have failed. API response did not contain expected 'name' field.\")\n", " # Handle potential errors reported in json_result if your helper provides them\n", " return None\n", "\n", " except Exception as e:\n", " print(f\"\\nError creating custom IAM role: {e}\")\n", " # Handle exceptions raised by restAPIHelper during creation\n", " return None" ] }, { "cell_type": "markdown", "metadata": { "id": "euYNMf6wsmYx" }, "source": [ "#### initialize\n", "Calls the methods to create the external connection, create the GCS bucket, apply IAM permissions and copies the public data. This method can be re-run as required and does not cause duplication issues. Each method tests for the existance of items before creating." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "zzfuxuZALOub" }, "outputs": [], "source": [ "def initialize(params):\n", " \"\"\"Create the BigLake connection, GCS bucket, set IAM permissions and copies data\"\"\"\n", "\n", " # Create the BigLake connection (if not exists)\n", " bigLakeServiceAccountId = createBigLakeConnection(params)\n", " print(f\"createBigLakeConnection: {bigLakeServiceAccountId}\")\n", " params[\"bigLakeServiceAccountId\"] = bigLakeServiceAccountId\n", "\n", " # Create storage account (if not exists)\n", " createGoogleCloudStorageBucket(params)\n", "\n", " # Grant access to GCS Bucket for BigLake Connection (if not exists)\n", " setBucketIamPolicy(params, f\"serviceAccount:{bigLakeServiceAccountId}\", \"roles/storage.objectAdmin\")\n", " setBucketIamPolicy(params, f\"user:{user}\", \"roles/storage.admin\")\n", "\n", " # Copy the sample data (if not exists)\n", " if bucketFileExists(params) is True:\n", " print(\"Data has already been copied\")\n", " else:\n", " print(\"Data has not been copied, copying now\")\n", "\n", " # Copy the data\n", " # https://cloud.google.com/storage/docs/gsutil/commands/cp\n", "\n", " # See: https://console.cloud.google.com/storage/browser/data-analytics-golden-demo/biglake/v1/biglake-tables\n", " source_path = \"gs://data-analytics-golden-demo/biglake/v1/*\"\n", " dest_path = f\"gs://{params['biglake_bucket_name']}/\"\n", " print(f\"Copying data from {source_path} to {dest_path}\")\n", " print(\"This may take a few minutes...\")\n", " !gsutil -m -q cp -r {source_path} {dest_path}\n", " print(\"Copy is complete\")\n", "\n", " updateHudiManifest(params)\n", " print(\"Hudi manifest updated\")\n", "\n", " getProjectNumber(params)\n", " activateServiceAPIs(params)" ] }, { "cell_type": "markdown", "metadata": { "id": "axvxBIEwoQgC" }, "source": [ "## <font color='gray'>Initialize BigLake Demo</font>\n", "Creates resources and copies data. This is re-runable and does not cause duplication of resources." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "EPijoKP1oDva" }, "outputs": [], "source": [ "initialize(params)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "H242XELPCSDQ" }, "outputs": [], "source": [ "%%bigquery --params $params\n", "\n", "CREATE SCHEMA IF NOT EXISTS biglake_mt_dataset OPTIONS(location = @bigquery_location);\n", "\n", "CREATE SCHEMA IF NOT EXISTS biglake_dataset OPTIONS(location = @bigquery_location);" ] }, { "cell_type": "markdown", "metadata": { "id": "eIIE1CXLxOT0" }, "source": [ "## <font color='blue'>BigLake Overview -</font> BigLake Managed / Self Managed Tables vs BigQuery Tables" ] }, { "cell_type": "markdown", "metadata": { "id": "lwLrZcGgzroP" }, "source": [ "<table>\n", " <tr>\n", " <th rowspan=\"2\">Item</th>\n", " <th colspan=\"3\">BigLake</th>\n", " <th colspan=\"2\">BigQuery</th>\n", " </tr>\n", " <tr>\n", " <th>Managed Table</th>\n", " <th>Self Managed Table</th>\n", " <th>Iceberg Tables via BigLake Metastore</th>\n", " <th>Managed Table (Internal / Native)</th>\n", " <th>External Table</th>\n", " </tr>\n", " <tr>\n", " <td>Storage Format</td>\n", " <td>Iceberg</td>\n", " <td>CSV, Delta, Hudi, Iceberg, Parquet, etc.</td>\n", " <td>Iceberg</td>\n", " <td>Capacitor</td>\n", " <td>CSV,ORC, Parquet, etc.</td>\n", " </tr>\n", " <tr>\n", " <td>Storage Location</td>\n", " <td>Customer GCS</td>\n", " <td>Customer GCS</td>\n", " <td>Customer GCS</td>\n", " <td>Google Internal</td>\n", " <td>Customer GCS</td>\n", " </tr>\n", " <tr>\n", " <td>Read/Write</td>\n", " <td>CRUD</td>\n", " <td>Read only from BQ / Updates via Spark</td>\n", " <td>Read only from BQ / Updates via Spark</td>\n", " <td>CRUD</td>\n", " <td>Read only</td>\n", " </tr>\n", " <tr>\n", " <td>RLS / CLS / Data Masking</td>\n", " <td>Yes</td>\n", " <td>Yes</td>\n", " <td>Yes</td>\n", " <td>Yes</td>\n", " <td>No</td>\n", " </tr>\n", " <tr>\n", " <td>Fully Managed</td>\n", " <td>Yes (recluster, optimize, etc.)</td>\n", " <td>No</td>\n", " <td>No</td>\n", " <td>Yes (recluster, optimize, etc.)</td>\n", " <td>No</td>\n", " </tr>\n", " <tr>\n", " <td>Partitioning</td>\n", " <td>Clustering</td>\n", " <td>Partition</td>\n", " <td>Partition</td>\n", " <td>Partition/Clustering</td>\n", " <td>Partition</td>\n", " </tr>\n", " <tr>\n", " <td>Streaming (native)</td>\n", " <td>Yes</td>\n", " <td>No</td>\n", " <td>No</td>\n", " <td>Yes</td>\n", " <td>No</td>\n", " </tr>\n", " <tr>\n", " <td>Time Travel</td>\n", " <td>Yes</td>\n", " <td>Manual</td>\n", " <td>No</td>\n", " <td>Yes</td>\n", " <td>No</td>\n", " </tr> \n", "\n", "</table>" ] }, { "cell_type": "markdown", "metadata": { "id": "C6d-5dIN4ofU" }, "source": [ "## <font color='blue'>BigLake Managed Tables -</font> Fully managed open source formats (Apache Iceberg)\n", "\n", "BigLake managed tables offer the fully managed experience of BigQuery tables while storing data in customer-owned Cloud Storage buckets using open file formats. BigLake managed tables support DML, streaming, and background storage optimizations such as clustering and adaptive file-sizing. BigLake managed tables are compatible with open-source engines like Spark through Apache Iceberg metadata snapshots." ] }, { "cell_type": "markdown", "metadata": { "id": "xBgEhz9A4aim" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Create Managed Tables</font>\n", "- Tables are created in a specified storage account\n", "- Tables are clustered" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vZXbaZiz4dfd" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE TABLE `{project_id}.biglake_mt_dataset.driver`\n", "(\n", " driver_id INT64,\n", " driver_name STRING,\n", " driver_mobile_number STRING,\n", " driver_license_number STRING,\n", " driver_email_address STRING,\n", " driver_dob DATE,\n", " driver_ach_routing_number STRING,\n", " driver_ach_account_number STRING\n", ")\n", "CLUSTER BY driver_id\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " file_format = 'PARQUET',\n", " table_format = 'ICEBERG',\n", " storage_uri = 'gs://{biglake_bucket_name}/biglake-managed-tables/driver'\n", ");\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_mt_dataset.driver **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Ci_7O3ZuG7Dk" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE TABLE `{project_id}.biglake_mt_dataset.location`\n", "(\n", " location_id INT64,\n", " zone STRING,\n", " service_zone STRING,\n", " latitude FLOAT64,\n", " longitude FLOAT64,\n", " borough STRING\n", ")\n", "CLUSTER BY location_id\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " file_format = 'PARQUET',\n", " table_format = 'ICEBERG',\n", " storage_uri = 'gs://{biglake_bucket_name}/biglake-managed-tables/location'\n", ");\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_mt_dataset.location **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "whroTzR-HciB" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE TABLE `{project_id}.biglake_mt_dataset.taxi_trips`\n", "(\n", " trip_id INT64,\n", " driver_id INT64,\n", " pickup_location_id INT64,\n", " pickup_datetime TIMESTAMP,\n", " dropoff_location_id INT64,\n", " dropoff_datetime TIMESTAMP,\n", " passenger_count INT64,\n", " trip_distance FLOAT64,\n", " fare_amount FLOAT64,\n", " surcharge FLOAT64,\n", " mta_tax FLOAT64,\n", " tip_amount FLOAT64,\n", " tolls_amount FLOAT64,\n", " ehail_fee FLOAT64,\n", " improvement_surcharge FLOAT64,\n", " total_amount FLOAT64\n", ")\n", "CLUSTER BY pickup_datetime, pickup_location_id, driver_id\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " file_format = 'PARQUET',\n", " table_format = 'ICEBERG',\n", " storage_uri = 'gs://{biglake_bucket_name}/biglake-managed-tables/taxi_trips'\n", ");\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_mt_dataset.taxi_trips **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "markdown", "metadata": { "id": "xI20OyYSIi-W" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Load Managed Tables</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "V-GAZu_LIl-D" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "LOAD DATA INTO `biglake_mt_dataset.driver`\n", "FROM FILES (\n", " format = 'parquet',\n", " uris = ['gs://data-analytics-golden-demo/biglake/v1-source/managed-table-source/driver/*.parquet']);" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RWnxCU4XJryG" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "LOAD DATA INTO `biglake_mt_dataset.location`\n", "FROM FILES (\n", " format = 'parquet',\n", " uris = ['gs://data-analytics-golden-demo/biglake/v1-source/managed-table-source/location/*.parquet']);" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "O-gcKV-6JrLs" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "LOAD DATA INTO `biglake_mt_dataset.taxi_trips`\n", "FROM FILES (\n", " format = 'parquet',\n", " uris = ['gs://data-analytics-golden-demo/biglake/v1-source/managed-table-source/taxi_trips/*.parquet']);" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "OjjMIbnIf0o0" }, "outputs": [], "source": [ "print(f\"View the files on the storage account:\")\n", "print(f\"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/biglake-managed-tables/taxi_trips\")" ] }, { "cell_type": "markdown", "metadata": { "id": "xoUo1wbGJ-tM" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Query Managed Tables</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "l5BR3eVqKAww" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- See amount of data loaded (we can load 45 million + records quickly into Iceberg)\n", "\n", "SELECT 'driver' AS table_name, FORMAT(\"%'d\",COUNT(*)) AS record_count FROM `biglake_mt_dataset.driver`\n", "UNION ALL\n", "SELECT 'location' AS table_name, FORMAT(\"%'d\",COUNT(*)) AS record_count FROM `biglake_mt_dataset.location`\n", "UNION ALL\n", "SELECT 'taxi_trips' AS table_name, FORMAT(\"%'d\",COUNT(*)) AS record_count FROM `biglake_mt_dataset.taxi_trips`;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "QHrjNimzc0tg" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Total and Average tip amount per Driver\n", "\n", "SELECT driver.driver_name,\n", " SUM(trips.tip_amount) AS total_tip_amount,\n", " FORMAT(\"%.*f\",2,AVG(trips.tip_amount)) AS avg_tip_amount,\n", " FROM `biglake_mt_dataset.taxi_trips` AS trips\n", " INNER JOIN `biglake_mt_dataset.driver` AS driver\n", " ON trips.driver_id = driver.driver_id\n", " AND driver.driver_id BETWEEN 1 AND 10\n", " GROUP BY ALL\n", " ORDER BY 1\n", " LIMIT 25;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "7nZAumZ5eahf" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Total and Average tip amount per Zone\n", "\n", "SELECT pickup_location.borough AS pickup_location_borough,\n", " pickup_location.zone AS pickup_location_zone,\n", "\n", " dropoff_location.borough AS dropoff_location_borough,\n", " dropoff_location.zone AS dropoff_location_zone,\n", "\n", " FORMAT(\"%.*f\",2,AVG(trips.passenger_count)) AS avg_passenger_count,\n", " FORMAT(\"%.*f\",2,AVG(trips.fare_amount)) AS avg_fare_amount\n", "\n", " FROM `biglake_mt_dataset.taxi_trips` AS trips\n", " INNER JOIN `biglake_mt_dataset.driver` AS driver\n", " ON trips.driver_id = driver.driver_id\n", " INNER JOIN `biglake_mt_dataset.location` AS pickup_location\n", " ON trips.pickup_location_id = pickup_location.location_id\n", " INNER JOIN `biglake_mt_dataset.location` AS dropoff_location\n", " ON trips.dropoff_location_id = dropoff_location.location_id\n", " GROUP BY ALL\n", " ORDER BY 1,2,3,4\n", " LIMIT 25;" ] }, { "cell_type": "markdown", "metadata": { "id": "5rEpRHqQffHz" }, "source": [ "#### <font color=\"#4285f4\">BigLake - CRUD Managed Tables</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "SfzbHcKWgThk" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Create a new driver with some random data\n", "\n", "INSERT INTO `biglake_mt_dataset.driver`\n", " (driver_id, driver_name, driver_mobile_number, driver_license_number, driver_email_address,\n", " driver_dob, driver_ach_routing_number, driver_ach_account_number)\n", "VALUES (999999, 'BigLake Managed Driver',\n", " CAST(CONCAT(CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING),'-',\n", " CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING),'-',\n", " CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) AS STRING),\n", " CAST(CONCAT(CAST(CAST(ROUND(10 + RAND() * (99 - 10)) AS INT) AS STRING),'-',\n", " CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING),'-',\n", " CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-',\n", " CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) AS STRING),\n", " CAST(CONCAT(LOWER(REPLACE('BigLake Managed Driver',' ','.')),'@gmail.com') AS STRING),\n", " CAST(DATE_SUB(CURRENT_DATE(), INTERVAL CAST(ROUND(6570 + RAND() * (24820 - 6570)) AS INT) DAY) AS DATE),\n", " CAST(CAST(ROUND(100000000 + RAND() * (999999999 - 100000000)) AS INT) AS STRING),\n", " CAST(CAST(ROUND(100000000 + RAND() * (999999999 - 100000000)) AS INT) AS STRING));" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cEKAmKP2hVHS" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Add a new column and populate it with data\n", "\n", "ALTER TABLE `biglake_mt_dataset.driver`\n", " ADD COLUMN IF NOT EXISTS license_plate STRING;\n", "\n", "UPDATE `biglake_mt_dataset.driver`\n", " SET license_plate = CAST(CONCAT(CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING),'-',\n", " CAST(CAST(ROUND(100 + RAND() * (999 - 1000)) AS INT) AS STRING)) AS STRING)\n", " WHERE TRUE;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "X4wRJadThJed" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- See the new driver and the new column\n", "\n", "SELECT driver_id, driver_name, license_plate\n", " FROM `biglake_mt_dataset.driver`\n", " WHERE driver_id > 9990\n", " ORDER BY driver_id DESC;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vdUx0hM2igCT" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Delete the new driver\n", "\n", "DELETE FROM `biglake_mt_dataset.driver`\n", " WHERE driver_id = 999999;" ] }, { "cell_type": "markdown", "metadata": { "id": "S2WI114sisVM" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Managed Tables</font> - Streaming Ingestion\n", "BigLake Managed Tables support streaming ingestion of data. In this example Pub/Sub will be used to stream data directly into a managed table." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "OugKpVW9iwyp" }, "outputs": [], "source": [ "# Create a new table for streaming ingestion\n", "\n", "sql = f\"\"\"\n", "\n", "CREATE TABLE IF NOT EXISTS `{project_id}.biglake_mt_dataset.taxi_trips_streaming`\n", "(\n", " subscription_name STRING,\n", " message_id STRING,\n", " publish_time TIMESTAMP,\n", " data STRING,\n", " attributes STRING\n", ")\n", "CLUSTER BY publish_time\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " file_format = 'PARQUET',\n", " table_format = 'ICEBERG',\n", " storage_uri = 'gs://{biglake_bucket_name}/biglake-managed-tables/taxi_trips_streaming'\n", ");\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_mt_dataset.taxi_trips_streaming **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We need bucket get for streaming to work\n", "\n", "role_id = \"biglake_bucket_get_role\"\n", "role_title = \"BigLake-Bucket-Get-Role\"\n", "role_description = \"Required for streaming ingestion to Iceberg\"\n", "permissions_to_grant = [\"storage.buckets.get\"] \n", "create_custom_storage_viewer_role(project_id, role_id, role_title, role_description, permissions_to_grant)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Grant our external connection service principal bucket.get role\n", "full_role_name = f\"projects/{project_id}/roles/{role_id}\"\n", "setBucketIamPolicy(params, f\"serviceAccount:{ params['bigLakeServiceAccountId']}\", full_role_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "siu0vrZZ57st" }, "outputs": [], "source": [ "# Grants the Pub/Sub default service account permissions to the BigLake MT Dataset\n", "# This will allow it to stream the data into the table\n", "\n", "project_number = getProjectNumber(params)\n", "print(f\"project_number: {project_number}\")\n", "pubSubServiceAccountEmail = f\"service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com\"\n", "\n", "print(f\"pubSubServiceAccountEmail: {pubSubServiceAccountEmail}\")\n", "setBigQueryDatasetPolicy(params, \"biglake_mt_dataset\", f\"{pubSubServiceAccountEmail}\", \"OWNER\")" ] }, { "cell_type": "markdown", "metadata": { "id": "dqb8wDcDLgqN" }, "source": [ "<font color=\"red\">WARNING: If you create this Pub/Sub subscription, please make sure you DELETE it (2 cells down).</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "_ilZVnX5j6Il" }, "outputs": [], "source": [ "# Create a Pub/Sub subscription that will steam data into the table\n", "\n", "project_id = params[\"project_id\"]\n", "table_name = f\"{project_id}.biglake_mt_dataset.taxi_trips_streaming\"\n", "topic_name = f\"projects/pubsub-public-data/topics/taxirides-realtime\"\n", "\n", "createPubSubSubscription(params, \"biglake-mt-streaming\", table_name, topic_name)\n", "\n", "print()\n", "print(f\"To view Pub/Sub: https://console.cloud.google.com/cloudpubsub/subscription/detail/biglake-mt-streaming\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "a1RPG3yqX3vY" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "SELECT message_id, publish_time, data\n", " FROM `biglake_mt_dataset.taxi_trips_streaming`\n", " --WHERE publish_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)\n", " LIMIT 10" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wwTQ3r6Hj56G" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Query the streaming data\n", "WITH streaming_data AS (\n", "SELECT message_id, publish_time, data\n", " FROM `biglake_mt_dataset.taxi_trips_streaming`\n", " WHERE publish_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)\n", " LIMIT 10\n", ")\n", ", streaming_json AS (\n", "SELECT message_id, publish_time, PARSE_JSON(data) as trip_json\n", " FROM streaming_data\n", ")\n", "SELECT message_id, publish_time, trip_json.ride_id,\n", " trip_json.latitude, trip_json.longitude,\n", " trip_json.meter_reading,\n", " trip_json.ride_status,\n", " trip_json.passenger_count\n", " FROM streaming_json;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "nX-DEwBUL6_l" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- See the counts increasing (run several times in a row)\n", "SELECT FORMAT(\"%'d\",COUNT(*)) AS record_count\n", " FROM `biglake_mt_dataset.taxi_trips_streaming`\n", " WHERE publish_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR);" ] }, { "cell_type": "markdown", "metadata": { "id": "nYijZi1VLxfV" }, "source": [ "<font color=\"red\">Removes the Pub/Sub Subcriptions - Stops Billing!</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9Hms9R8SArlM" }, "outputs": [], "source": [ "deletePubSubSubscription(params, \"biglake-mt-streaming\")\n", "\n", "print()\n", "print(f\"Please VERIFY that Pub/Sub has been removed: https://console.cloud.google.com/cloudpubsub/subscription\")" ] }, { "cell_type": "markdown", "metadata": { "id": "0V3_WYyZ2QP-" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Managed Tables</font> - Metadata Export\n", "Manually generate an Iceberg snapshot" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RxeCitgI3I0y" }, "outputs": [], "source": [ "print(f\"View the metadata BEFORE the EXPORT:\")\n", "print(f\"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/biglake-managed-tables/driver/metadata\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iR7qgduU23RH" }, "outputs": [], "source": [ "%%bigquery\n", "EXPORT TABLE METADATA FROM biglake_mt_dataset.driver" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "njNrfIpo3KUZ" }, "outputs": [], "source": [ "print(f\"View the metadata AFTER the EXPORT:\")\n", "print(f\"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/biglake-managed-tables/driver/metadata\")" ] }, { "cell_type": "markdown", "metadata": { "id": "bLSn2m2LU-A0" }, "source": [ "## <font color='blue'>BigLake Self Managed Tables -</font> Support for open source formats\n", "BigLake support a variety of formats. Here we will show Avro, Csv, Delta.io, Hudi, Json and Parquet. There are additional supported formats as well. Apache Iceberg will be shown in its own area of this notebook." ] }, { "cell_type": "markdown", "metadata": { "id": "7kj03aZ4lfvQ" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Avro</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YTiarih9UHIW" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.payment_type_avro`\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"AVRO\",\n", " enable_logical_types = true,\n", " uris = ['gs://{biglake_bucket_name}/biglake-tables/payment_type_table_avro/*.avro']\n", ");\n", "\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.payment_type_avro **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "markdown", "metadata": { "id": "Z4YJik5ymS_u" }, "source": [ "#### <font color=\"#4285f4\">BigLake - CSV</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ziohVZBpVo9W" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.rate_code_csv`\n", "(\n", " Rate_Code_Id\tINTEGER,\n", " Rate_Code_Description\tSTRING\n", ")\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"CSV\",\n", " uris = ['gs://{biglake_bucket_name}/biglake-tables/rate_code_table_csv/*.csv']\n", ");\n", "\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.rate_code_csv **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "markdown", "metadata": { "id": "mWRp2Wv9nSim" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Delta.io (Delta Lake)</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "M4dQb6PSkAVk" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.vendor_delta_io`\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"DELTA_LAKE\",\n", " uris = ['gs://{biglake_bucket_name}/biglake-tables/vendor_delta_io']\n", ");\n", "\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.vendor_delta_io **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "markdown", "metadata": { "id": "h4YF2lOuhz4K" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Hudi</font>\n", "- PySpark source code that created the Hudi table: [GitHub](https://github.com/GoogleCloudPlatform/data-analytics-golden-demo/blob/main/dataproc/pyspark_apache_hudi.py)\n", "- This uses the [Hudi-BigQuery connector](https://github.com/apache/hudi/blob/master/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xjCw5W6wh3am" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.location_hudi`\n", "WITH PARTITION COLUMNS\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format=\"PARQUET\",\n", " uris=[\"gs://{biglake_bucket_name}/biglake-tables/location_hudi/.hoodie/absolute-path-manifest/*\"],\n", " file_set_spec_type = 'NEW_LINE_DELIMITED_MANIFEST',\n", " hive_partition_uri_prefix = \"gs://{biglake_bucket_name}/biglake-tables/location_hudi/\",\n", " max_staleness = INTERVAL 30 MINUTE,\n", " metadata_cache_mode = 'MANUAL'\n", ");\n", "\n", "CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('biglake_dataset.location_hudi_TEST');\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.location_hudi **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "markdown", "metadata": { "id": "AX_kVFZ-mViO" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Json</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dqXcQqJJW38d" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.trip_type_json`\n", "(\n", " Trip_Type_Id\tINTEGER,\n", " Trip_Type_Description\tSTRING\n", ")\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"JSON\",\n", " uris = ['gs://{biglake_bucket_name}/biglake-tables/trip_type_json/*.json']\n", ");\n", "\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.trip_type_json **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()" ] }, { "cell_type": "markdown", "metadata": { "id": "TzX7x_HKZEkp" }, "source": [ "#### <font color=\"#4285f4\">BigLake - Parquet w/Hive Partitioning and Metadata Caching</font>\n", "- BigLake supports Hive partitioned files\n", "- BigLake supports Metadata caching to boost performance" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Xtw3gz3XYeAA" }, "outputs": [], "source": [ "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.taxi_trips_parquet`\n", "WITH PARTITION COLUMNS (\n", " year INTEGER, -- column order must match the external path\n", " month INTEGER\n", ")\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"PARQUET\",\n", " hive_partition_uri_prefix = \"gs://{biglake_bucket_name}/biglake-tables/taxi_trips_parquet/\",\n", " uris = ['gs://{biglake_bucket_name}/biglake-tables/taxi_trips_parquet/*.parquet'],\n", " max_staleness=INTERVAL 30 MINUTE,\n", " metadata_cache_mode=\"MANUAL\" -- This can be setup to be 30 minutes or more\n", ");\n", "\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.taxi_trips_parquet **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()\n", "\n", "# Refresh can only be done for \"manual\" cache mode. This is done since this is a demo.\n", "sql = f\"CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('{project_id}.biglake_dataset.taxi_trips_parquet')\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Refreshed Metadata on {project_id}.biglake_dataset.taxi_trips_parquet **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Metadata refresh failed\")\n", " print()\n" ] }, { "cell_type": "markdown", "metadata": { "id": "cJUEgVzUyXXu" }, "source": [ "## <font color='blue'>BigLake SQL -</font> Query tables in all formats\n" ] }, { "cell_type": "markdown", "metadata": { "id": "AtzY7yT8Qe9Q" }, "source": [ "### <font color=\"#4285f4\">SQL - Query each format</font>\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "i46v3agJycPO" }, "outputs": [], "source": [ "%%bigquery\n", "SELECT *\n", " FROM biglake_dataset.payment_type_avro;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "aNqpGBb1jZRg" }, "outputs": [], "source": [ "%%bigquery\n", "SELECT *\n", " FROM biglake_dataset.location_hudi\n", "LIMIT 25;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Ckaw2-UiyliI" }, "outputs": [], "source": [ "%%bigquery\n", "SELECT *\n", " FROM biglake_dataset.vendor_delta_io;" ] }, { "cell_type": "markdown", "metadata": { "id": "LRvgYFTqy3ZK" }, "source": [ "### <font color=\"#4285f4\">SQL - Join all the different formats</font>\n", "We can join all the data from all the differet formats (parquet, hudi, delta, csv, json, avro)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CiJ7_3s_y6sd" }, "outputs": [], "source": [ "%%bigquery\n", "SELECT location.borough AS pickup_borough,\n", " location.zone AS pickup_zone,\n", " payment_type.payment_type_description,\n", " rate_code.rate_code_description,\n", " trip_type.trip_type_description,\n", " vendor.vendor_description,\n", " SUM(taxi_trips.fare_amount) AS fare_amount,\n", " SUM(taxi_trips.total_amount) AS total_amount,\n", " COUNT(*) AS number_of_trips\n", " FROM biglake_dataset.taxi_trips_parquet AS taxi_trips\n", " INNER JOIN biglake_dataset.location_hudi AS location\n", " ON taxi_trips.PULocationID = location.location_id\n", " INNER JOIN biglake_dataset.payment_type_avro AS payment_type\n", " ON taxi_trips.payment_type_id = payment_type.payment_type_id\n", " INNER JOIN biglake_dataset.rate_code_csv AS rate_code\n", " ON taxi_trips.rate_code_id = rate_code.rate_code_id\n", " INNER JOIN biglake_dataset.trip_type_json AS trip_type\n", " ON taxi_trips.trip_type = trip_type.trip_type_id\n", " INNER JOIN biglake_dataset.vendor_delta_io AS vendor\n", " ON taxi_trips.vendor_id = vendor.vendor_id\n", "GROUP BY ALL\n", "ORDER BY 1, 2, 3\n", "LIMIT 100;" ] }, { "cell_type": "markdown", "metadata": { "id": "ub2YEuDGsSzm" }, "source": [ "## <font color='blue'>BigLake Security / Goverance -</font> Row, Column, Data Masking\n", "BigLake supports IAM, Row Level, Column Level and Data Masking security" ] }, { "cell_type": "markdown", "metadata": { "id": "xGpknP56wHwM" }, "source": [ "### <font color='gray'>Helper Functions - CLS / Data Masking</font>\n", "Calls the REST API to create taxonomy, policies, data policies and set IAM on policies and data policies.\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "fdv7nU51WgKh" }, "source": [ "#### createTaxonomy\n", "Creates the top level Taxonomy" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wc-XHboHtNWV" }, "outputs": [], "source": [ "def createTaxonomy(params):\n", " \"\"\"Creates a Taxonomy.\"\"\"\n", "\n", " # First find the connection\n", " # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies/list\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", " taxonomy_name = params[\"taxonomy_name\"]\n", " url = f\"https://datacatalog.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/taxonomies\"\n", "\n", "\n", " # Gather existing connections\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createTaxonomy (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"taxonomies\" in json_result:\n", " for item in json_result[\"taxonomies\"]:\n", " print(f\"displayName: {item['displayName']}\")\n", " # \"projects/test/locations/us/taxonomies/2620666826070342226\"\n", " # NOTE: We cannot test the complete name since it contains the an unknown number\n", " if item[\"displayName\"] == taxonomy_name:\n", " print(\"Taxonomy already exists\")\n", " name = item[\"name\"]\n", " return name\n", "\n", " # Create the taxonomy\n", " # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies/create\n", " print(\"Creating Taxonomy\")\n", "\n", " url = f\"https://datacatalog.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/taxonomies\"\n", "\n", " request_body = {\n", " \"displayName\": taxonomy_name,\n", " \"description\": \"BigLake Demo - Colab Notebook\",\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " name = json_result[\"name\"]\n", " print(\"Taxonomy created: \", name)\n", " return name" ] }, { "cell_type": "markdown", "metadata": { "id": "Juvg5uULWmkK" }, "source": [ "#### createPolicyTag\n", "Creates a Taxonomy Policy Tag or Child Policy Tag" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "DcKwdvn5x-Yo" }, "outputs": [], "source": [ "def createPolicyTag(params, taxonomy_name, policy_parent, policy_name):\n", " \"\"\"Creates Taxonomy Policy Tag or Sub-Policy Tag\"\"\"\n", "\n", " # First find the connection\n", " # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies/list\n", " project_id = params[\"project_id\"]\n", "\n", "\n", " url = f\"https://datacatalog.googleapis.com/v1/{taxonomy_name}/policyTags\"\n", "\n", " # Gather existing connections\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createTaxonomyPolicyTags (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so returns\n", " if \"policyTags\" in json_result:\n", " for item in json_result[\"policyTags\"]:\n", " # print(f\"displayName: {item['displayName']}\")\n", " # \"projects/test/locations/us/taxonomies/2620666826070342226\"\n", " # NOTE: We cannot test the complete name since it contains the an unknown number\n", " if item[\"displayName\"] == policy_name:\n", " print(f\"{policy_name} already exists\")\n", " return item[\"name\"]\n", "\n", "\n", " # Create the taxonomy (High)\n", " # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies.policyTags/create\n", " print(f\"Creating Policy {policy_name}\")\n", "\n", " url = f\"https://datacatalog.googleapis.com/v1/{taxonomy_name}/policyTags\"\n", "\n", " if policy_parent is None:\n", " request_body = {\n", " \"displayName\": policy_name,\n", " \"description\": \"BigLake Demo - Colab Notebook - \" + policy_name,\n", " }\n", " else:\n", " request_body = {\n", " \"parentPolicyTag\" : policy_parent,\n", " \"displayName\": policy_name,\n", " \"description\": \"BigLake Demo - Colab Notebook - \" + policy_name,\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " policy_full_name = json_result[\"name\"]\n", " print(\"Policy created: \", policy_full_name)\n", "\n", " return policy_full_name" ] }, { "cell_type": "markdown", "metadata": { "id": "k6P93_h0clkh" }, "source": [ "#### securePolicyTag\n", "Secures a policy (column level security)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0XhKli1pc7P3" }, "outputs": [], "source": [ "def securePolicyTag(params, policy_name):\n", " \"\"\"Secure a Policy.\"\"\"\n", "\n", " # First find the IAM Permission\n", " # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies.policyTags/getIamPolicy\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", "\n", " url = f\"https://datacatalog.googleapis.com/v1/{policy_name}:getIamPolicy\"\n", "\n", " # Gather existing data policies\n", " request_body = { }\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print(f\"getIamPolicy (POST) json_result: {json_result}\")\n", "\n", " # Test for existance\n", " if \"bindings\" in json_result:\n", " for item in json_result[\"bindings\"]:\n", " print(f\"role: {item['role']}\")\n", " for member in item[\"members\"]:\n", " print(f\"member: {member}\")\n", " if member == \"user:\" + params[\"user\"]:\n", " print(\"securePolicyTag: Permissions exist\")\n", " return\n", "\n", " # Set IAM\n", " # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies.policyTags/setIamPolicy\n", " url = f\"https://datacatalog.googleapis.com/v1/{policy_name}:setIamPolicy\"\n", "\n", " request_body = {\n", " \"policy\": {\n", " \"bindings\":[\n", " {\n", " \"members\": [ \"user:\" + params[\"user\"] ],\n", " \"role\":\"roles/datacatalog.categoryFineGrainedReader\"\n", " }\n", " ]\n", " }\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print(\"IAM Security Set: \", policy_name)" ] }, { "cell_type": "markdown", "metadata": { "id": "YwXWrHGUWwaX" }, "source": [ "#### createDataPolicy\n", "Create a data masking policy" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Vfmx84jf4VrK" }, "outputs": [], "source": [ "def createDataPolicy(params, policyTag, policy_name, dataPolicyType, predefinedExpression):\n", " \"\"\"Creates a Data Policy.\"\"\"\n", "\n", " # First find the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigquerydatapolicy/rest/v1/projects.locations.dataPolicies/list?\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", "\n", " url = f\"https://bigquerydatapolicy.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/dataPolicies\"\n", "\n", " # Gather existing data policies\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createDataPolicies (GET) json_result: {json_result}\")\n", "\n", " # Test for policy_name\n", " if \"dataPolicies\" in json_result:\n", " for item in json_result[\"dataPolicies\"]:\n", " # print(f\"name: {item['name']}\")\n", " if item[\"name\"] == f\"projects/{project_id}/locations/{bigquery_location}/dataPolicies/{policy_name}\":\n", " print(f\"createDataPolicy policy exists: {policy_name}\")\n", " return item[\"name\"]\n", "\n", " # Create Data Policy\n", " # https://cloud.google.com/bigquery/docs/reference/bigquerydatapolicy/rest/v1/projects.locations.dataPolicies/create\n", " url = f\"https://bigquerydatapolicy.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/dataPolicies\"\n", "\n", " # Create\n", " print(f\"Creating Data Policy {policy_name}\")\n", "\n", " request_body = {\n", " \"dataPolicyId\": policy_name,\n", " \"dataPolicyType\": dataPolicyType,\n", " \"policyTag\" : policyTag,\n", " \"dataMaskingPolicy\": {\n", " \"predefinedExpression\": predefinedExpression\n", " }\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " policy_name = json_result[\"name\"]\n", " print(\"Data Policy created: \", policy_name)\n", "\n", " return policy_name" ] }, { "cell_type": "markdown", "metadata": { "id": "DGrTRDhFW4i2" }, "source": [ "#### secureDataPolicy\n", "Secures a data policy (masking) tag" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "bv1zGDFTbk7O" }, "outputs": [], "source": [ "def secureDataPolicy(params, data_policy_name):\n", " \"\"\"Secure a Data Policy.\"\"\"\n", "\n", " # First find the IAM Permission\n", " # https://cloud.google.com/bigquery/docs/reference/bigquerydatapolicy/rest/v1/projects.locations.dataPolicies/getIamPolicy\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", "\n", " url = f\"https://bigquerydatapolicy.googleapis.com/v1/{data_policy_name}:getIamPolicy\"\n", "\n", " # Gather existing data policies\n", " request_body = { }\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print(f\"getIamPolicy (POST) json_result: {json_result}\")\n", "\n", " # Test for existance\n", " if \"bindings\" in json_result:\n", " for item in json_result[\"bindings\"]:\n", " print(f\"role: {item['role']}\") # I should check the role here too \"roles/bigquerydatapolicy.maskedReader\"\n", " for member in item[\"members\"]:\n", " print(f\"member: {member}\")\n", " if member == \"user:\" + params[\"user\"]:\n", " print(\"secureDataPolicy: Permissions exist\")\n", " return\n", "\n", " # Set IAM\n", " # https://cloud.google.com/bigquery/docs/reference/bigquerydatapolicy/rest/v1/projects.locations.dataPolicies/setIamPolicy\n", " url = f\"https://bigquerydatapolicy.googleapis.com/v1/{data_policy_name}:setIamPolicy\"\n", "\n", " request_body = {\n", " \"policy\": {\n", " \"bindings\":[\n", " {\n", " \"members\": [ \"user:\" + params[\"user\"] ],\n", " \"role\": \"roles/bigquerydatapolicy.maskedReader\"\n", " }\n", " ]\n", " }\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print(\"IAM Security Set: \", data_policy_name)" ] }, { "cell_type": "markdown", "metadata": { "id": "mveILgDIXgb6" }, "source": [ "### <font color='gray'>Initailize Data Security - CLS / Data Masking</font>\n", "Creates the Taxonomy and Data Masking Rules. This creates two top level Taxonomies (High and Low Security Clearance). Then for each type of data a Policy is created. Security is then granted to the policy which would enforce columns level permissions. For other fields, a data masking rule is created under the policy and then security is grated.\n", "- Best Practices: https://cloud.google.com/bigquery/docs/best-practices-policy-tags\n" ] }, { "cell_type": "markdown", "metadata": { "id": "fK1mHSWTvto4" }, "source": [ "<img src=\"https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-CLS-Data-Mask.png\" width=\"800\" height=\"232\" valign=\"top\" alt=\"BigLake Table Column / Data Masking\">\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "GsRFD0D7wcrO" }, "outputs": [], "source": [ "################################################################################\n", "# Create Hierarchical Data Policies\n", "################################################################################\n", "# Overview\n", "# 1 - Create the overall Taxonomy (createPolicyTag)\n", "# - high_security_clearance\n", "\n", "# 2 - Create each Policy\n", "# - phone_number\n", "# - government_identification\n", "# - email_address\n", "# - date_of_birth\n", "# - bank_account_routing\n", "# - bank_account_number\n", "\n", "# 3 - Secure Policies without Data Masking (securePolicyTag)\n", "# roles/datacatalog.categoryFineGrainedReader\n", "# - phone_number\n", "# - email_address\n", "\n", "# 4 - Create Data Masking (createDataPolicy)\n", "# - government_identification,LAST_FOUR_CHARACTERS\n", "# - date_of_birth,DATE_YEAR_MASK\n", "# - bank_account_routing,FIRST_FOUR_CHARACTERS\n", "# - bank_account_number,LAST_FOUR_CHARACTERS\n", "\n", "# 5 - Secure Data Policy (secureDataPolicy)\n", "# roles/bigquerydatapolicy.maskedReader\n", "# - government_identification\n", "# - date_of_birth\n", "# - bank_account_routing\n", "# - bank_account_number\n", "\n", "\n", "################################################################################\n", "# To see the Taxonomy open this link in a new tab: https://console.cloud.google.com/bigquery/policy-tags\n", "################################################################################\n", "taxonomy_name = createTaxonomy(params)\n", "print(f\"taxonomy_name: {taxonomy_name}\")\n", "\n", "\n", "################################################################################\n", "# High Security Clearance\n", "################################################################################\n", "policy_high_security_clearance = createPolicyTag(params, taxonomy_name, None, \"high_security_clearance\")\n", "print(f\"policy_high_security_clearance: {policy_high_security_clearance}\")\n", "\n", "## High Security Clearance -> Phone Number\n", "policy_high_security_clearance_phone_number = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, \"hsc_pt_phone_number\")\n", "print(f\"policy_high_security_clearance_phone_number: {policy_high_security_clearance_phone_number}\")\n", "## No need to create data mask for phone_number, instead we are granting access to the column (CLS)\n", "securePolicyTag(params, policy_high_security_clearance_phone_number)\n", "\n", "## High Security Clearance -> Government Identification\n", "policy_high_security_clearance_government_identification = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, \"hsc_pt_government_identification\")\n", "print(f\"policy_high_security_clearance_government_identification: {policy_high_security_clearance_government_identification}\")\n", "datamask_policy_high_security_clearance_government_identification = createDataPolicy(params, policy_high_security_clearance_government_identification, \"hsc_dm_government_identification\", \"DATA_MASKING_POLICY\", \"LAST_FOUR_CHARACTERS\")\n", "secureDataPolicy(params, datamask_policy_high_security_clearance_government_identification)\n", "\n", "## High Security Clearance -> Email Address\n", "policy_high_security_clearance_email_address = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, \"hsc_pt_email_address\")\n", "print(f\"policy_high_security_clearance_email_address: {policy_high_security_clearance_email_address}\")\n", "## No need to create data mask for email_address, instead we are granting access to the column (CLS)\n", "securePolicyTag(params, policy_high_security_clearance_email_address)\n", "\n", "## High Security Clearance -> Date of Birth\n", "policy_high_security_clearance_date_of_birth = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, \"hsc_pt_date_of_birth\")\n", "print(f\"policy_high_security_clearance_date_of_birth: {policy_high_security_clearance_date_of_birth}\")\n", "datamask_policy_high_security_clearance_date_of_birth = createDataPolicy(params, policy_high_security_clearance_date_of_birth, \"hsc_dm_date_of_birth\", \"DATA_MASKING_POLICY\", \"DATE_YEAR_MASK\")\n", "secureDataPolicy(params, datamask_policy_high_security_clearance_date_of_birth)\n", "\n", "## High Security Clearance -> Bank Account Routing\n", "policy_high_security_clearance_bank_account_routing = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, \"hsc_pt_bank_account_routing\")\n", "print(f\"policy_high_security_clearance_bank_account_routing: {policy_high_security_clearance_bank_account_routing}\")\n", "datamask_policy_high_security_clearance_bank_account_routing = createDataPolicy(params, policy_high_security_clearance_bank_account_routing, \"hsc_dm_bank_account_routing\", \"DATA_MASKING_POLICY\", \"FIRST_FOUR_CHARACTERS\")\n", "secureDataPolicy(params, datamask_policy_high_security_clearance_bank_account_routing)\n", "\n", "## High Security Clearance -> Bank Account Number\n", "policy_high_security_clearance_bank_account_number = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, \"hsc_pt_bank_account_number\")\n", "print(f\"policy_high_security_clearance_bank_account_number: {policy_high_security_clearance_bank_account_number}\")\n", "datamask_policy_high_security_clearance_bank_account_number = createDataPolicy(params, policy_high_security_clearance_bank_account_number, \"hsc_dm_bank_account_number\", \"DATA_MASKING_POLICY\", \"LAST_FOUR_CHARACTERS\")\n", "secureDataPolicy(params, datamask_policy_high_security_clearance_bank_account_number)\n", "\n", "\n", "\n", "################################################################################\n", "# Low Security Clearance\n", "################################################################################\n", "policy_low_security_clearance = createPolicyTag(params, taxonomy_name, None, \"low_security_clearance\")\n", "print(f\"policy_low_security_clearance: {policy_low_security_clearance}\")\n", "\n", "## Low Security Clearance -> Phone Number\n", "policy_low_security_clearance_phone_number = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, \"lsc_pt_phone_number\")\n", "print(f\"policy_low_security_clearance_phone_number: {policy_low_security_clearance_phone_number}\")\n", "datamask_policy_low_security_clearance_phone_number = createDataPolicy(params, policy_low_security_clearance_phone_number, \"lsc_dm_phone_number\", \"DATA_MASKING_POLICY\", \"LAST_FOUR_CHARACTERS\")\n", "secureDataPolicy(params, datamask_policy_low_security_clearance_phone_number)\n", "\n", "## Low Security Clearance -> Government Identification\n", "policy_low_security_clearance_government_identification = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, \"lsc_pt_government_identification\")\n", "print(f\"policy_low_security_clearance_government_identification: {policy_low_security_clearance_government_identification}\")\n", "datamask_policy_low_security_clearance_government_identification = createDataPolicy(params, policy_low_security_clearance_government_identification, \"lsc_dm_government_identification\", \"DATA_MASKING_POLICY\", \"SHA256\")\n", "secureDataPolicy(params, datamask_policy_low_security_clearance_government_identification)\n", "\n", "## Low Security Clearance -> Email Address\n", "policy_low_security_clearance_email_address = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, \"lsc_pt_email_address\")\n", "print(f\"policy_low_security_clearance_email_address: {policy_low_security_clearance_email_address}\")\n", "datamask_policy_low_security_clearance_email_address = createDataPolicy(params, policy_low_security_clearance_email_address, \"lsc_dm_email_address\", \"DATA_MASKING_POLICY\", \"EMAIL_MASK\")\n", "secureDataPolicy(params, datamask_policy_low_security_clearance_email_address)\n", "\n", "## Low Security Clearance -> Date of Birth\n", "policy_low_security_clearance_date_of_birth = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, \"lsc_pt_date_of_birth\")\n", "print(f\"policy_low_security_clearance_date_of_birth: {policy_low_security_clearance_date_of_birth}\")\n", "datamask_policy_low_security_clearance_date_of_birth = createDataPolicy(params, policy_low_security_clearance_date_of_birth, \"lsc_dm_date_of_birth\", \"DATA_MASKING_POLICY\", \"DATE_YEAR_MASK\")\n", "secureDataPolicy(params, datamask_policy_low_security_clearance_date_of_birth)\n", "\n", "## Low Security Clearance -> Bank Account Routing\n", "policy_low_security_clearance_bank_account_routing = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, \"lsc_pt_bank_account_routing\")\n", "print(f\"policy_low_security_clearance_bank_account_routing: {policy_low_security_clearance_bank_account_routing}\")\n", "datamask_policy_low_security_clearance_bank_account_routing = createDataPolicy(params, policy_low_security_clearance_bank_account_routing, \"lsc_dm_bank_account_routing\", \"DATA_MASKING_POLICY\", \"DEFAULT_MASKING_VALUE\")\n", "secureDataPolicy(params, datamask_policy_low_security_clearance_bank_account_routing)\n", "\n", "## Low Security Clearance -> Bank Account Number\n", "policy_low_security_clearance_bank_account_number = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, \"lsc_pt_bank_account_number\")\n", "print(f\"policy_low_security_clearance_bank_account_number: {policy_low_security_clearance_bank_account_number}\")\n", "datamask_policy_low_security_clearance_bank_account_number = createDataPolicy(params, policy_low_security_clearance_bank_account_number, \"lsc_dm_bank_account_number\", \"DATA_MASKING_POLICY\", \"DEFAULT_MASKING_VALUE\")\n", "secureDataPolicy(params, datamask_policy_low_security_clearance_bank_account_number)\n" ] }, { "cell_type": "markdown", "metadata": { "id": "N-qtO41jKxWi" }, "source": [ "### <font color=\"#4285f4\">Create tables to apply security -</font> CLS / Data Masking\n", "- driver_parquet_rls_cls_dm_high - High Security Clearance (show more data)\n", "- driver_parquet_rls_cls_dm_low - Low Security Clearance (Hides more data)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9-ZtWJw_Kin-" }, "outputs": [], "source": [ "# Create a Highly Privileged table\n", "\n", "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.driver_parquet_rls_cls_dm_high`\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"PARQUET\",\n", " uris = ['gs://{biglake_bucket_name}/biglake-tables/driver_parquet/*.parquet'],\n", " max_staleness=INTERVAL 30 MINUTE,\n", " metadata_cache_mode=\"MANUAL\" -- This can be setup to be 30 minutes or more\n", ");\n", "\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.driver_parquet_rls_cls_dm_high **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()\n", "\n", "# Refresh can only be done for \"manual\" cache mode. This is done since this is a demo.\n", "sql = f\"CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('{project_id}.biglake_dataset.driver_parquet_rls_cls_dm_high')\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Refreshed Metadata on {project_id}.biglake_dataset.driver_parquet_rls_cls_dm_high **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Metadata refresh failed\")\n", " print()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "EiVaSzwzKo7r" }, "outputs": [], "source": [ "# Create a Low Privileged table\n", "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.driver_parquet_rls_cls_dm_low`\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"PARQUET\",\n", " uris = ['gs://{biglake_bucket_name}/biglake-tables/driver_parquet/*.parquet'],\n", " max_staleness=INTERVAL 30 MINUTE,\n", " metadata_cache_mode=\"MANUAL\" -- This can be setup to be 30 minutes or more\n", ");\n", "\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.driver_parquet_rls_cls_dm_low **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()\n", "\n", "# Refresh can only be done for \"manual\" cache mode. This is done since this is a demo.\n", "sql = f\"CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('{project_id}.biglake_dataset.driver_parquet_rls_cls_dm_low')\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Refreshed Metadata on {project_id}.biglake_dataset.driver_parquet_rls_cls_dm_low **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Metadata refresh failed\")\n", " print()\n" ] }, { "cell_type": "markdown", "metadata": { "id": "F3YuzQWSLH9G" }, "source": [ "### <font color=\"#4285f4\">Alter the tables schemas to add the policy tags -</font> CLS / Data Masking\n", "- Apply the above created Policy/Data Masking tags to each field\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "bnvi6CS5LS7U" }, "outputs": [], "source": [ "driver_parquet_rls_cls_dm_high_schema_updated = [\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_id\",\n", " \"type\": \"INTEGER\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_name\",\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_mobile_number\",\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_license_number\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_high_security_clearance_government_identification\n", " ]\n", " },\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_email_address\",\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_dob\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_high_security_clearance_date_of_birth\n", " ]\n", " },\n", " \"type\": \"DATE\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_ach_routing_number\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_high_security_clearance_bank_account_routing\n", " ]\n", " },\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_ach_account_number\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_high_security_clearance_bank_account_number\n", " ]\n", " },\n", " \"type\": \"STRING\"\n", " }\n", "]\n", "\n", "updateTableSchema(params[\"project_id\"], \"biglake_dataset\", \"driver_parquet_rls_cls_dm_high\",driver_parquet_rls_cls_dm_high_schema_updated)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "fdzU0bPiLizQ" }, "outputs": [], "source": [ "driver_parquet_rls_cls_dm_low_schema_updated = [\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_id\",\n", " \"type\": \"INTEGER\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_name\",\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_mobile_number\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_low_security_clearance_phone_number\n", " ]\n", " },\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_license_number\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_low_security_clearance_government_identification\n", " ]\n", " },\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_email_address\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_low_security_clearance_email_address\n", " ]\n", " },\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_dob\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_low_security_clearance_date_of_birth\n", " ]\n", " },\n", " \"type\": \"DATE\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_ach_routing_number\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_low_security_clearance_bank_account_routing\n", " ]\n", " },\n", " \"type\": \"STRING\"\n", " },\n", " {\n", " \"mode\": \"NULLABLE\",\n", " \"name\": \"driver_ach_account_number\",\n", " \"policyTags\": {\n", " \"names\": [\n", " policy_low_security_clearance_bank_account_number\n", " ]\n", " },\n", " \"type\": \"STRING\"\n", " }\n", "]\n", "\n", "updateTableSchema(params[\"project_id\"], \"biglake_dataset\", \"driver_parquet_rls_cls_dm_low\",driver_parquet_rls_cls_dm_low_schema_updated)" ] }, { "cell_type": "markdown", "metadata": { "id": "Ak-9aiLTL4mN" }, "source": [ "### <font color=\"#4285f4\">Query the tables - </font> CLS / Data Masking\n", "- First open the tables in the BigQuery UI and see the policies applied\n", "- The columns will be hidden or masked.\n", "- NOTE: If you remove permissions to a Policy Tag, then you will get an error when you attempt to query the field.\n" ] }, { "cell_type": "markdown", "metadata": { "id": "WDql1bcKMjMx" }, "source": [ "<img src=\"https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-CLS-Data-Mask.png\" width=\"800\" height=\"232\" valign=\"top\" alt=\"BigLake Table Column / Data Masking\">\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "VBar9jz9L_Fp" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- The High Security Clearance let's us see the fields based upon the above chart\n", "SELECT *\n", " FROM `biglake_dataset.driver_parquet_rls_cls_dm_high`\n", " LIMIT 100;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "aIAnj0LJMVPy" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- The Low Security Clearance hides additional fields (driver_license_number, email, phone number)\n", "SELECT *\n", " FROM `biglake_dataset.driver_parquet_rls_cls_dm_low`\n", " LIMIT 100;" ] }, { "cell_type": "markdown", "metadata": { "id": "KAGEnV0qSmWv" }, "source": [ "### <font color=\"#4285f4\">Row Level Secuity</font>\n", "- We can filter tables with a predicate (\"WHERE clause\") on our data.\n", "- RLS works on any BigLake table of any underlying data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "v3WEW1Y4S9HF" }, "outputs": [], "source": [ "# Filter the Taxi Trips table for just pickup location = 1 and trip distance is > 10\n", "sql = f\"\"\"\n", "CREATE OR REPLACE ROW ACCESS POLICY taxi_trips_parquet_rap\n", " ON `biglake_dataset.taxi_trips_parquet`\n", " GRANT TO (\"user:{params['user']}\")\n", "FILTER USING (PULocationID = 1 AND Trip_Distance > 10);\n", "\"\"\"\n", "\n", "runQuery(sql)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "eqWXzoldS8-b" }, "outputs": [], "source": [ "%%bigquery\n", "SELECT PULocationID, DOLocationID, Passenger_Count, Trip_Distance, Total_Amount\n", " FROM `biglake_dataset.taxi_trips_parquet`\n", " LIMIT 10;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dFod0D3qS8yD" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Remove all policies\n", "DROP ALL ROW ACCESS POLICIES ON `biglake_dataset.taxi_trips_parquet`;" ] }, { "cell_type": "markdown", "metadata": { "id": "KKZg4QOgrszb" }, "source": [ "## <font color='blue'>BigLake Apache Iceberg, Spark Stored Procedures and BigQuery Metastore</font>\n", "BigLake has several ways to support Apache Iceberg\n", "- BigLake Metadata file - read-only support of an Iceberg table. Requires manual updates of metadata.\n", "- BigQuery Metastore - read-only support of an Iceberg table while Spark provides read/write support. Metadata is kept up to date. https://cloud.google.com/bigquery/docs/about-bqms\n", "- BigLake Managed Tables - fully managed experience on Apache Iceberg with support for DML and high throughput streaming" ] }, { "cell_type": "markdown", "metadata": { "id": "hDIYk3O3s4_M" }, "source": [ "### <font color='gray'>Helper Functions - Create Spark Connection</font>\n", "Creates the BigQuery to Dataproc Serverless connection and sets permissions.\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "euOWHFh7tPOl" }, "source": [ "#### createSparkConnection\n", "Creates the Spark connection for BigQuery for Serverless Spark (Dataproc)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "BhDssg0itSuv" }, "outputs": [], "source": [ "def createSparkConnection(params):\n", " \"\"\"Creates a Spark connection in BigQuery.\"\"\"\n", "\n", " # First find the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", " spark_connection_name = params[\"spark_connection_name\"]\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections\"\n", "\n", " # Gather existing connections\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createSparkConnection (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"connections\" in json_result:\n", " for item in json_result[\"connections\"]:\n", " print(f\"Spark Connection: {item['name']}\")\n", " # \"projects/756740881369/locations/us/connections/spark-notebook-connection\"\n", " # NOTE: We cannot test the complete name since it contains the project number and not id\n", " if item[\"name\"].endswith(f\"/locations/{bigquery_location}/connections/{spark_connection_name}\"):\n", " print(\"Connection already exists\")\n", " serviceAccountId = item[\"spark\"][\"serviceAccountId\"]\n", " return serviceAccountId\n", "\n", " # Create the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create\n", " print(\"Creating Spark Connection\")\n", "\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={spark_connection_name}\"\n", "\n", " request_body = {\n", " \"friendlyName\": spark_connection_name,\n", " \"description\": \"Spark Colab Notebooks Connection for Data Analytics Golden Demo\",\n", " \"spark\": {}\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " serviceAccountId = json_result[\"spark\"][\"serviceAccountId\"]\n", " print(\"Spark Connection created: \", serviceAccountId)\n", " return serviceAccountId\n" ] }, { "cell_type": "markdown", "metadata": { "id": "SInv_a_TtcyR" }, "source": [ "#### setBigQueryConnectionIamPolicy\n", "Sets the IAM Permissions on the BigQuery Connection" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "yP5eB7Yftcyb" }, "outputs": [], "source": [ "def setBigQueryConnectionIamPolicy(params, accountWithPrefix, role):\n", " \"\"\"Sets the BigQuery connection IAM policy.\"\"\"\n", "\n", " # Get the current bindings (if the account has access then skip)\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/getIamPolicy\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", " biglake_connection_name = params[\"biglake_connection_name\"]\n", "\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections/{biglake_connection_name}:getIamPolicy\"\n", "\n", " request_body = { }\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print(f\"setBigQueryConnectionIamPolicy (GET) json_result: {json_result}\")\n", "\n", " # Test to see if permissions exist\n", " if \"bindings\" in json_result:\n", " for item in json_result[\"bindings\"]:\n", " members = item[\"members\"]\n", " for member in members:\n", " if member == accountWithPrefix:\n", " print(\"Permissions exist\")\n", " return\n", "\n", " # Take the existing bindings and we need to append the new permission\n", " # Otherwise we loose the existing permissions\n", " if \"bindings\" in json_result:\n", " bindings = json_result[\"bindings\"]\n", " else:\n", " bindings = []\n", "\n", " new_permission = {\n", " \"role\": role,\n", " \"members\": [ accountWithPrefix ]\n", " }\n", "\n", " bindings.append(new_permission)\n", "\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/setIamPolicy\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections/{biglake_connection_name}:setIamPolicy\"\n", "\n", " request_body = { \"policy\" : {\n", " \"bindings\" : bindings\n", " }\n", " }\n", "\n", " print(f\"Permission bindings: {bindings}\")\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print()\n", " print(f\"json_result: {json_result}\")\n", " print()\n", " print(f\"BigQuery Connection IAM Permissions set for {accountWithPrefix} {role}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "026dLBjStdIS" }, "source": [ "#### setProjectLevelIamPolicy\n", "Sets the IAM Permissions at the Project Level" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "V5ojWsOItdIT" }, "outputs": [], "source": [ "def setProjectLevelIamPolicy(params, accountWithPrefix, role):\n", " \"\"\"Sets the Project Level IAM policy.\"\"\"\n", "\n", " # Get the current bindings (if the account has access then skip)\n", " # https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy\n", " project_id = params[\"project_id\"]\n", "\n", " url = f\"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}:getIamPolicy\"\n", "\n", " request_body = { }\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print(f\"setProjectLevelIamPolicy (GET) json_result: {json_result}\")\n", "\n", " # Test to see if permissions exist\n", " if \"bindings\" in json_result:\n", " for item in json_result[\"bindings\"]:\n", " if item[\"role\"] == role:\n", " members = item[\"members\"]\n", " for member in members:\n", " if member == accountWithPrefix:\n", " print(\"Permissions exist\")\n", " return\n", "\n", " # Take the existing bindings and we need to append the new permission\n", " # Otherwise we loose the existing permissions\n", " if \"bindings\" in json_result:\n", " bindings = json_result[\"bindings\"]\n", " else:\n", " bindings = []\n", "\n", " new_permission = {\n", " \"role\": role,\n", " \"members\": [ accountWithPrefix ]\n", " }\n", "\n", " bindings.append(new_permission)\n", "\n", " # https://cloud.google.com/resource-manager/reference/rest/v1/projects/setIamPolicy\n", " url = f\"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}:setIamPolicy\"\n", "\n", " request_body = { \"policy\" : {\n", " \"bindings\" : bindings\n", " }\n", " }\n", "\n", " print(f\"Permission bindings: {bindings}\")\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print()\n", " print(f\"json_result: {json_result}\")\n", " print()\n", " print(f\"Project Level IAM Permissions set for {accountWithPrefix} {role}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "uKW89180txEl" }, "source": [ "### <font color='gray'>Initailize Permissions -</font> Spark Stored Procedure using BigLake and BigQuery Metastore\n", "Creates following:\n", "- Create the external connection ```spark-notebook-connection```\n", "- Grants the IAM permissions on biglake and spark connections\n", "- See this video: [YouTube](https://youtu.be/IQR9gJuLXbQ)" ] }, { "cell_type": "markdown", "metadata": { "id": "668N3-Fbhoru" }, "source": [ "#### Security Setup Images" ] }, { "cell_type": "markdown", "metadata": { "id": "sI3SPQLBhoLu" }, "source": [ "<img src=\"https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/bigquery-metastore-spark-connection.png\" width=\"800\" valign=\"top\" alt=\"BigQuery Metastore Spark\">\n", "\n", "<img src=\"https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/bigquery-metastore-biglake-connecton.png\" width=\"800\" valign=\"top\" alt=\"BigQuery Metastore Biglake\">" ] }, { "cell_type": "markdown", "metadata": { "id": "nsUtWP4AiX3d" }, "source": [ "#### Security Setup Code" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "KQh9JZO4xXaQ" }, "outputs": [], "source": [ "# Create the Spark connection (if not exists)\n", "sparkServiceAccountId = createSparkConnection(params)\n", "print(f\"createSparkConnection: {sparkServiceAccountId}\")\n", "params[\"sparkServiceAccountId\"] = sparkServiceAccountId\n", "\n", "bigLakeServiceAccountId = params[\"bigLakeServiceAccountId\"]\n", "\n", "# Grant the Spark connection service principal access to the Cloud Storage account\n", "# This account needs to read/write data from this account as part of the Spark processing (when it creates/read Iceberg tables)\n", "setBucketIamPolicy(params, f\"serviceAccount:{sparkServiceAccountId}\", \"roles/storage.objectAdmin\")\n", "\n", "# We need to grant bigquery.connections.delegate permission (or (roles/bigquery.connectionAdmin)) to the Spark Service Account ON THE BigLake connection\n", "# You should create a custom role with bigquery.connections.delegate permission for this (this is the only permission required)\n", "setBigQueryConnectionIamPolicy(params, f\"serviceAccount:{sparkServiceAccountId}\", \"roles/bigquery.connectionAdmin\")\n", "\n", "# In IAM add roles/biglake.admin to the us.biglake-notebook-connection service account\n", "# To create the tables in BigQuery linked to BigQuery Metastore\n", "setProjectLevelIamPolicy(params, f\"serviceAccount:{bigLakeServiceAccountId}\", \"roles/biglake.admin\")\n", "\n", "# In IAM add roles/bigquery.user to the us.spark-notebook-connection service account\n", "# To create BigQuery jobs\n", "setProjectLevelIamPolicy(params, f\"serviceAccount:{sparkServiceAccountId}\", \"roles/bigquery.user\")\n", "\n", "# Set roles/bigquery.dataOwner (OWNER) to both service principals (biglake and spark connections)\n", "setBigQueryDatasetPolicy(params, \"biglake_dataset\", f\"{bigLakeServiceAccountId}\", \"OWNER\")\n", "setBigQueryDatasetPolicy(params, \"biglake_dataset\", f\"{sparkServiceAccountId}\", \"OWNER\")" ] }, { "cell_type": "markdown", "metadata": { "id": "Oe_tnCQjoARq" }, "source": [ "### <font color='blue'>Spark Stored Procedure -</font> Create Apache Iceberg Table using BigLake Metastore" ] }, { "cell_type": "markdown", "metadata": { "id": "OsQd-H-Cw5gU" }, "source": [ "#### <font color='blue'>Spark Script</font>\n", "The BigQuery Spark Stored Procedure will reference a script in GCS" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RJB_qcyltXen" }, "outputs": [], "source": [ "print(f\"To view the PySpark Scripts: https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/pyspark?project={project_id}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "DVS5xnHqxA0p" }, "source": [ "#### <font color='blue'>Create BigQuery Spark Stored Procedure</font>\n", "- PySpark source code that creates the Iceberg table: [GitHub](https://github.com/GoogleCloudPlatform/data-analytics-golden-demo/blob/main/dataproc/pyspark_apache_iceberg_bqms.py)\n", "- Now that the connection is configured, the Spark stored procedure can be created that references the script.\n", "- Note all the Spark Properities required for using the BigLake Metastore with Iceberg." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "hnfeHx_xoKmx" }, "outputs": [], "source": [ "# Create the Spark Stored Procedure\n", "project_id = params[\"project_id\"]\n", "bigquery_location = params[\"bigquery_location\"]\n", "biglake_bucket_name = params[\"biglake_bucket_name\"]\n", "\n", "sql = f\"\"\"CREATE OR REPLACE PROCEDURE`{project_id}.biglake_dataset.sp_iceberg_driver_iceberg`(\n", "\ticeberg_catalog STRING,\n", "\ticeberg_warehouse STRING,\n", "\ticeberg_table STRING,\n", "\tbq_dataset STRING,\n", "\tbq_region STRING,\n", "\tbiglake_connection STRING,\n", "\tsource_parquet_file STRING,\n", "\tproject_id STRING\n", ")\n", "WITH CONNECTION `{project_id}.{bigquery_location}.spark-notebook-connection`\n", "OPTIONS (\n", " main_file_uri=\"gs://{biglake_bucket_name}/pyspark/pyspark_apache_iceberg_bqms.py\",\n", "\tengine='SPARK',\n", "\truntime_version='2.2',\n", "\tjar_uris=[\"gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar\"],\n", "\tproperties=[\n", "\t\t(\"spark.jars.packages\",\"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2\"),\n", "\t\t(\"spark.sql.catalog.iceberg_catalog\",\"org.apache.iceberg.spark.SparkCatalog\"),\n", "\t\t(\"spark.sql.catalog.iceberg_catalog.catalog-impl\",\"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog\"),\n", "\t\t(\"spark.sql.catalog.iceberg_catalog.gcp_project\",\"{project_id}\"),\n", "\t\t(\"spark.sql.catalog.iceberg_catalog.gcp_location\",\"{bigquery_location}\"),\n", "\t\t(\"spark.sql.catalog.iceberg_catalog.warehouse\",\"gs://{biglake_bucket_name}/biglake-tables\")\n", "\t\t]\n", "\t)\n", "LANGUAGE python;\"\"\"\n", "\n", "runQuery(sql)" ] }, { "cell_type": "markdown", "metadata": { "id": "muk3y-iwxXM7" }, "source": [ "#### <font color='blue'>Execute the Spark Stored Procedure</font>\n", "- This will take 2 to 4 minutes.\n", "- This will:\n", " - Initialize the Iceberg catalog\n", " - Initialize the Iceberg warehouse\n", " - Create the \"driver_iceberg\" table\n", " - Open the parquet file and then insert the data into the iceberg table\n", "- When done:\n", " - You will see the files on storage\n", " - Open [bucket](https://console.cloud.google.com/storage/browser)\n", " - Under biglake-tables you will see: iceberg_warehouse.db\n", " - A table has been created in the biglake_dataset\n", " - Open the table, click on Details and you will see the BigQuery Metatstore connection" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "D29EUn8poKiJ" }, "outputs": [], "source": [ "project_id = params[\"project_id\"]\n", "biglake_bucket_name = params[\"biglake_bucket_name\"]\n", "bigquery_location = params[\"bigquery_location\"]\n", "\n", "print(\"Run this in BigQuery\")\n", "print(\"\")\n", "\n", "sql = f\"\"\"CALL `{project_id}.biglake_dataset.sp_iceberg_driver_iceberg`(\n", " \"iceberg_catalog\",\n", " \"iceberg_warehouse\",\n", " \"driver_iceberg\",\n", " \"biglake_dataset\",\n", " \"{bigquery_location}\",\n", " \"biglake-notebook-connection\",\n", " \"gs://{biglake_bucket_name}/biglake-tables/driver_parquet/*.snappy.parquet\",\n", " \"{project_id}\"\n", ");\"\"\"\n", "\n", "print(f\"sql: {sql}\")\n", "print()\n", "print(\"When the job is done, click on Job Information and Log to see the Spark log.\")\n", "\n", "runQuery(sql)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "L21QpUSewICo" }, "outputs": [], "source": [ "print(f\"To view the Iceberg files: https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/biglake-tables/iceberg_warehouse.db?project={project_id}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "5FKjK0AQwhVp" }, "source": [ "#### <font color='blue'>BigLake Query Iceberg</font>\n", "Query the Apache Iceberg table that is connected to the BigLake Metastore and create a table using the manifest apporach." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "7l4C6UY1oKYp" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- This table will see any writes performed by Spark\n", "SELECT *\n", " FROM `iceberg_warehouse.driver_iceberg`\n", "ORDER BY driver_id\n", "LIMIT 10;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YV_0mCf2zQX2" }, "outputs": [], "source": [ "#*****************************************************************************\n", "# MANUAL STEP\n", "# You may also create an Iceberg table by pointing at the BigLake Metadata file\n", "# You need to open GCS and hard code the following URI\n", "# Select the JSON file with the LASTEST date/time stamp\n", "#*****************************************************************************\n", "\n", "sql = f\"\"\"\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.driver_iceberg_metadata_file`\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"ICEBERG\",\n", " uris = [\"gs://biglake-data-analytics-demo-tpo1envlg2/biglake-tables/iceberg_warehouse.db/driver_iceberg/metadata/00003-eebd23f8-1a8b-432d-9aa9-cf1511cb9658.metadata.json\"]\n", "\n", ");\n", "\"\"\"\n", "\n", "runQuery(sql)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rRrJDZq00Dmk" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- This table will only see at a specific point in time, based upon the metadata file\n", "SELECT *\n", " FROM `biglake_dataset.driver_iceberg_metadata_file`\n", "ORDER BY driver_id\n", "LIMIT 10;" ] }, { "cell_type": "markdown", "metadata": { "id": "HpslpY7js3KD" }, "source": [ "## <font color='blue'>BigLake Materialized Views -</font> Materialized views over BigLake Metadata Cache Enabled Tables\n", "- Create materialized views over BigLake tables\n", "- https://cloud.google.com/bigquery/docs/materialized-views-intro#biglake" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "WjEc0U-NuHBA" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Create the materialized view over metadata cache-enabled tables\n", "-- Sum the taxi trips information by location\n", "-- Here we are joining a parquet table to a hudi table\n", "\n", "CREATE OR REPLACE MATERIALIZED VIEW `biglake_dataset.taxi_trips_materialized_view`\n", "OPTIONS (enable_refresh = true, refresh_interval_minutes = 30,\n", " max_staleness=INTERVAL \"0:30:0\" HOUR TO SECOND,\n", " description='Taxi Trips by Date and Pickup Location')\n", "AS\n", "SELECT CAST(Pickup_DateTime AS DATE) AS pickup_date,\n", " location_pickup.borough AS pickup_borough,\n", " location_pickup.zone AS pickup_zone,\n", " SUM(taxi_trip.Fare_Amount) AS total_fare_amount,\n", " SUM(taxi_trip.Surcharge) AS total_surcharge,\n", " SUM(taxi_trip.Tip_Amount) AS total_tip_amount,\n", " SUM(taxi_trip.Total_Amount) AS total_total_amount\n", "\n", " FROM `biglake_dataset.taxi_trips_parquet` AS taxi_trip\n", " INNER JOIN `biglake_dataset.location_hudi` AS location_pickup\n", " ON taxi_trip.PULocationID = location_pickup.location_id\n", "GROUP BY ALL;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "kxa1T4gCxHMp" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Query the materialized view\n", "SELECT *\n", " FROM `biglake_dataset.taxi_trips_materialized_view`\n", " ORDER BY pickup_date DESC, pickup_borough, pickup_zone\n", " LIMIT 25;" ] }, { "cell_type": "markdown", "metadata": { "id": "QWaCBkeGBg1R" }, "source": [ "## <font color='blue'>BigLake Object Tables -</font> Unstructured Data Analytics with VertexAI" ] }, { "cell_type": "markdown", "metadata": { "id": "cFbGGfioLKar" }, "source": [ "### <font color='gray'>Helper Functions - </font>Create Connections</font>\n" ] }, { "cell_type": "markdown", "metadata": { "id": "rgIu6307I7Jb" }, "source": [ "#### createVertexAIConnection\n", "Creates the BigQuery external connection to allow calls to Vertex AI directly from BigQuery." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "hNexewwNI7Jl" }, "outputs": [], "source": [ "def createVertexAIConnection(params):\n", " \"\"\"Creates a Vertex AI connection.\"\"\"\n", "\n", " # First find the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", " vertex_ai_connection_name = params[\"vertex_ai_connection_name\"]\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections\"\n", "\n", " # Gather existing connections\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createVertexAIConnection (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"connections\" in json_result:\n", " for item in json_result[\"connections\"]:\n", " print(f\"BigLake Connection: {item['name']}\")\n", " # \"projects/756740881369/locations/us/connections/vertex-ai-notebook-connection\"\n", " # NOTE: We cannot test the complete name since it contains the project number and not id\n", " if item[\"name\"].endswith(f\"/locations/{bigquery_location}/connections/{vertex_ai_connection_name}\"):\n", " print(\"Connection already exists\")\n", " serviceAccountId = item[\"cloudResource\"][\"serviceAccountId\"]\n", " return serviceAccountId\n", "\n", " # Create the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create\n", " print(\"Creating Vertex AI Connection\")\n", "\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={vertex_ai_connection_name}\"\n", "\n", " request_body = {\n", " \"friendlyName\": biglake_connection_name,\n", " \"description\": \"Vertex AI Colab Notebooks Connection for Data Analytics Golden Demo\",\n", " \"cloudResource\": {}\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " serviceAccountId = json_result[\"cloudResource\"][\"serviceAccountId\"]\n", " print(\"Vertex AI Connection created: \", serviceAccountId)\n", " return serviceAccountId" ] }, { "cell_type": "markdown", "metadata": { "id": "yXbKuNuiLc8K" }, "source": [ "### <font color='blue'>BigLake Object Tables - </font>Create Object Tables\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cSxTmHCyLvYS" }, "outputs": [], "source": [ "# Copy data\n", "# Create vision, document and audio tables\n", "# Show security" ] }, { "cell_type": "markdown", "metadata": { "id": "KWXUhLn-x5Ei" }, "source": [ "#### <font color='blue'>BigLake Object Table - </font> Images\n", "Create an object table that is pointed at location which contains images." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AEd14oLAwjbO" }, "outputs": [], "source": [ "project_id = params[\"project_id\"]\n", "bigquery_location = params[\"bigquery_location\"]\n", "biglake_connection_name = params[\"biglake_connection_name\"]\n", "biglake_bucket_name = params[\"biglake_bucket_name\"]\n", "\n", "sql = f\"\"\"\n", "\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.object_table_taxi_image`\n", "WITH CONNECTION `{project_id}.{bigquery_location}.{biglake_connection_name}`\n", "OPTIONS (\n", " object_metadata=\"DIRECTORY\",\n", " uris = ['gs://{biglake_bucket_name}/biglake-object-tables/images/*.png'],\n", " max_staleness=INTERVAL 30 MINUTE,\n", " metadata_cache_mode=\"MANUAL\"\n", " );\n", "\n", "\"\"\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Created table {project_id}.biglake_dataset.taxi_trips_parquet **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Table creation failed\")\n", " print()\n", "\n", "# Refresh can only be done for \"manual\" cache mode. This is done since this is a demo.\n", "sql = f\"CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('{project_id}.biglake_dataset.object_table_taxi_image')\"\n", "\n", "if runQuery(sql) == True:\n", " print()\n", " print(f\"** Refreshed Metadata on {project_id}.biglake_dataset.object_table_taxi_image **\")\n", " print()\n", "else:\n", " print()\n", " print(\"Metadata refresh failed\")\n", " print()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "qF25S6-WxEza" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- NOTE: Each image contains a metadata table for borough.\n", "-- Row Level Security can be used to filter this data by borough or it can be used in queries.\n", "\n", "SELECT *\n", " FROM `biglake_dataset.object_table_taxi_image`\n", "LIMIT 5;" ] }, { "cell_type": "markdown", "metadata": { "id": "0HRVHV_nLAix" }, "source": [ "### <font color='blue'>BigLake Object Tables - </font>Vision Analysis using Vertex AI (Gemini Pro, Text/Vector Embeddings, Vector Search)\n", "Use machine learning to determine the contents of each image. Pass the results to Gemini Pro to get a description of the image. Then create text embeddings on the LLM result test. Finally perform a Vector Search (Semantic match) to search the images for objects." ] }, { "cell_type": "markdown", "metadata": { "id": "lLuIdBASdMC7" }, "source": [ "<img src=\"https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-Object-Table-Vector-Embeddings-Diagram.png\" width=\"800\" height=\"380\" valign=\"top\" alt=\"BigLake Object Tables with Vertex AI and Vector Embeddings\"> " ] }, { "cell_type": "markdown", "metadata": { "id": "X2VJR6G0yy0-" }, "source": [ "#### <font color='blue'>BigQuery Connection and Models - </font>Setup\n", "Create the connections to Vertex AI and Gemini Pro" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "GLoqFIw2JyN7" }, "outputs": [], "source": [ "# Create the BigQuery External Connection that will be used to call the Vertex AI\n", "# Set the required permissions on the external connection's service principal\n", "vertexAIServiceAccountId = createVertexAIConnection(params)\n", "\n", "params[\"vertexAIServiceAccountId\"] = vertexAIServiceAccountId\n", "bigLakeServiceAccountId = params[\"bigLakeServiceAccountId\"]\n", "\n", "# To call Vision API\n", "setProjectLevelIamPolicy(params, f\"serviceAccount:{vertexAIServiceAccountId}\", \"roles/serviceusage.serviceUsageConsumer\")\n", "setProjectLevelIamPolicy(params, f\"serviceAccount:{bigLakeServiceAccountId}\", \"roles/serviceusage.serviceUsageConsumer\")\n", "\n", "# To call GENERATE TEXT\n", "setProjectLevelIamPolicy(params, f\"serviceAccount:{vertexAIServiceAccountId}\",\"roles/aiplatform.user\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Shdhyq5cJ93A" }, "outputs": [], "source": [ "# Create the remote connection to each Vertex AI service. (Vision, Gemini Pro, Embeddings, etc.)\n", "# The models use the external connection\n", "\n", "project_id = params[\"project_id\"]\n", "bigquery_location = params[\"bigquery_location\"]\n", "vertex_ai_connection_name = params[\"vertex_ai_connection_name\"]\n", "\n", "sql = f\"\"\"CREATE MODEL IF NOT EXISTS `{project_id}.biglake_dataset.vision-connection`\n", "REMOTE WITH CONNECTION `{project_id}.{bigquery_location}.{vertex_ai_connection_name}`\n", "OPTIONS (remote_service_type = 'cloud_ai_vision_v1');\"\"\"\n", "\n", "runQuery(sql)\n", "\n", "print(f\"Created cloud_ai_vision_v1: {sql}\")\n", "\n", "sql = f\"\"\"CREATE MODEL IF NOT EXISTS `{project_id}.biglake_dataset.gemini_model`\n", "REMOTE WITH CONNECTION `{project_id}.{bigquery_location}.{vertex_ai_connection_name}`\n", "OPTIONS (endpoint = 'gemini-2.0-flash');\"\"\"\n", "\n", "runQuery(sql)\n", "\n", "print(f\"Created gemini_model: {sql}\")\n", "\n", "sql = f\"\"\"CREATE MODEL IF NOT EXISTS `{project_id}.biglake_dataset.textembedding_model`\n", "REMOTE WITH CONNECTION `{project_id}.{bigquery_location}.{vertex_ai_connection_name}`\n", "OPTIONS (endpoint = 'text-embedding-005');\"\"\"\n", "\n", "runQuery(sql)\n", "\n", "print(f\"Created text-embedding-005: {sql}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AU9_xZ6e0lhE" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "# Helper UDFs to use with Gemini Pro\n", "\n", "-- LLM helper methods to scrub common character issues\n", "CREATE OR REPLACE FUNCTION `biglake_dataset.parse_llm_to_text`(input JSON) RETURNS STRING AS (\n", "REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(JSON_VALUE(input.candidates[0].content.parts[0].text),'\\n',' '),'\\\"','\"'),'``` JSON',''),'```json',''),'```','')\n", ");\n", "\n", "CREATE OR REPLACE FUNCTION `biglake_dataset.parse_llm_to_json`(input JSON) RETURNS JSON AS (\n", "SAFE.PARSE_JSON(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(JSON_VALUE(input.candidates[0].content.parts[0].text),'\\n',' '),'\\\"','\"'),'``` JSON',''),'```json',''),'```','') )\n", ");" ] }, { "cell_type": "markdown", "metadata": { "id": "Ypy_wZ4bzBsn" }, "source": [ "#### <font color='blue'>Use Vertex Image Detection on BigLake Object Table</font> - **Gemini Pro Summarization and Vector Search**\n", "The following will:\n", "1. Create a new table that holds the results from the Vision AI call\n", "2. Call Gemini Pro which will take the Vision AI JSON result and ask for a readable description to be created.\n", "3. Text Embeddings will then be created over each LLM description\n", "4. Semantic Search will then be performed." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ve6qh8JBxogC" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Call the Vision API to determine Label and Object detection in each image\n", "\n", "CREATE OR REPLACE TABLE `biglake_dataset.object_table_taxi_image_inference` AS\n", "SELECT uri,\n", " ml_annotate_image_result,\n", " metadata,\n", " CAST(NULL AS STRING) AS llm_result,\n", " CAST(NULL AS ARRAY<FLOAT64>) AS vector_embedding\n", "FROM ML.ANNOTATE_IMAGE(\n", " MODEL `biglake_dataset.vision-connection`,\n", " TABLE `biglake_dataset.object_table_taxi_image`,\n", " STRUCT(['LABEL_DETECTION', 'OBJECT_LOCALIZATION'] AS vision_features)\n", ");\n", "\n", "# The Vision API also supports:\n", "# https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-annotate-image\n", "# FACE_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, LABEL_DETECTION\n", "# TEXT_DETECTION, DOCUMENT_TEXT_DETECTION, IMAGE_PROPERTIES, OBJECT_LOCALIZATION" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "KwjVFjHUz-nb" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "# View the results which are stored as JSON\n", "\n", "SELECT uri, ml_annotate_image_result\n", " FROM `biglake_dataset.object_table_taxi_image_inference`\n", "ORDER BY uri\n", "LIMIT 2;" ] }, { "cell_type": "markdown", "metadata": { "id": "hpZrVFGNLD2-" }, "source": [ "<font color='red'>Error Warning</font><br/>\n", "If you receive the error message ```bqcx-xxxxxxxxxxxx-xxxx@gcp-sa-bigquery-condel.iam.gserviceaccount.com does not have the permission to access resources used by ML.GENERATE_TEXT_EMBEDDING```. Please wait a few minutes for the security permissions to proprogate and try again." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "QjwMIvorz8zU" }, "outputs": [], "source": [ "# prompt: python code to sleep for 2 minutes\n", "\n", "import time\n", "time.sleep(120)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "UipXo4dF4irh" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Take the Vertex AI JSON results and pass them to Gemini Pro\n", "-- Create a readable description based upon the JSON\n", "\n", "UPDATE `biglake_dataset.object_table_taxi_image_inference` AS object_table_taxi_image_inference\n", " SET llm_result = `biglake_dataset.parse_llm_to_text`(ml_generate_text_result)\n", " FROM (SELECT *\n", " FROM ML.GENERATE_TEXT(MODEL `biglake_dataset.gemini_model`,\n", " (SELECT uri,\n", " CONCAT(\"Generate a description from the below JSON.\\n\",\n", " \"Make sure you include all the objects listed in the JSON.\\n\",\n", " \"The data will be used for lost objects in a taxi cab.\\n\",\n", " \"You do NOT need to include information about the taxi cab itself like seats, cab color, windows, etc..\\n\",\n", " \"The JSON is the output of image recognition.\\n\",\n", " \"JSON:\\n\",\n", " TO_JSON_STRING(ml_annotate_image_result)) AS prompt\n", " FROM `biglake_dataset.object_table_taxi_image_inference`\n", " WHERE llm_result IS NULL\n", " ),\n", " STRUCT(\n", " .9 AS temperature,\n", " 5000 AS max_output_tokens,\n", " .8 AS top_p,\n", " 30 AS top_k)\n", " )\n", " ) AS llm_query\n", "WHERE object_table_taxi_image_inference.uri = llm_query.uri;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RIpTamybANyB" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- See the LLM Results\n", "-- Search for an item that was lost in the taxi\n", "-- This uses the LIKE keyword to do a search which requires us to be specific in our search.\n", "\n", "SELECT uri, llm_result\n", " FROM `biglake_dataset.object_table_taxi_image_inference`\n", " WHERE llm_result LIKE '%travel bag%'\n", " OR llm_result LIKE '%backpack%'\n", "ORDER BY uri;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CL8lFnkOGqL0" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Now create Text Embedded on the LLM Description\n", "-- This will let us do a Vector Search\n", "\n", "SELECT uri, content, text_embedding\n", " FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,\n", " (SELECT uri, llm_result AS content\n", " FROM `biglake_dataset.object_table_taxi_image_inference`),\n", " STRUCT(TRUE AS flatten_json_output)\n", " )\n", "LIMIT 5;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iMpOv3dsBSiq" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- Update the text_embeddings column in the object_table_taxi_image_inference table\n", "\n", "UPDATE `biglake_dataset.object_table_taxi_image_inference` AS object_table_taxi_image_inference\n", " SET vector_embedding = text_embedding\n", " FROM (SELECT *\n", " FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,\n", " (SELECT uri, llm_result AS content\n", " FROM `biglake_dataset.object_table_taxi_image_inference`),\n", " STRUCT(TRUE AS flatten_json_output)\n", " )) AS llm_embedding\n", "WHERE object_table_taxi_image_inference.uri = llm_embedding.uri;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "x3nMAcaze7kE" }, "outputs": [], "source": [ "#%%bigquery\n", "\n", "# BigQuery Support Vector Indexes on embeddings. In our case we do not have enough data to demo.\n", "# You need at least 5000 rows to create a VECTOR INDEX\n", "\n", "# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#create_vector_index_statement\n", "\n", "#CREATE VECTOR INDEX object_table_taxi_image_vector_index ON `biglake_dataset.object_table_taxi_image_inference`(vector_embedding)\n", "#OPTIONS (index_type = 'IVF', distance_type = 'EUCLIDEAN');" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "uJ-Re03Nf1y1" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "# Search now using semanitic match\n", "# Search for Backpack. Previously we searched for:\n", "# llm_result LIKE '%travel bag%'OR llm_result LIKE '%backpack%'.\n", "\n", "# The LIKE search returned 1 row and now we get back additional matches\n", "\n", "# https://cloud.google.com/bigquery/docs/reference/standard-sql/search_functions#vector_search\n", "\n", "SELECT base.uri, base.llm_result, distance\n", " FROM VECTOR_SEARCH(TABLE `biglake_dataset.object_table_taxi_image_inference`,\n", " 'vector_embedding',\n", " (SELECT *\n", " FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,\n", " (SELECT 'backpack' AS content ),\n", " STRUCT(TRUE AS flatten_json_output))),\n", " 'text_embedding',\n", " top_k => 2,\n", " distance_type => 'EUCLIDEAN' -- or COSINE\n", " );" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YZxP3xL6vybX" }, "outputs": [], "source": [ "################################################################################\n", "# Search for lost backpacks\n", "################################################################################\n", "from PIL import Image\n", "import IPython.display\n", "\n", "sql = \"\"\"SELECT base.uri AS uri, base.metadata[0].value as borough, distance\n", " FROM VECTOR_SEARCH(TABLE `biglake_dataset.object_table_taxi_image_inference`,\n", " 'vector_embedding',\n", " (SELECT *\n", " FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,\n", " (SELECT 'backpack' AS content ),\n", " STRUCT(TRUE AS flatten_json_output))),\n", " 'text_embedding',\n", " top_k => 2,\n", " distance_type => 'EUCLIDEAN' -- or COSINE\n", " )\n", " ORDER BY distance;\"\"\"\n", "\n", "image_df = runQuery(sql)\n", "\n", "for index, row in image_df.iterrows():\n", " uri = row['uri']\n", " borough = row['borough']\n", " downloaded_filename = downloadGCSFile(uri)\n", " print(f\"uri: {uri}\")\n", " print(f\"borough: {borough}\")\n", " img = Image.open(downloaded_filename)\n", " img.thumbnail([400,711])\n", " IPython.display.display(img)\n", " print()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "fUpkove73BNi" }, "outputs": [], "source": [ "################################################################################\n", "# Search for lost \"cell\" phones in the Bronx\n", "# You can also search for \"hat\"\n", "# You can also comment out -- WHERE base.metadata[0].value = 'Bronx'\n", "################################################################################\n", "\n", "from PIL import Image\n", "import IPython.display\n", "\n", "sql = \"\"\"SELECT base.uri, base.metadata[0].value as borough, distance\n", " FROM VECTOR_SEARCH(TABLE `biglake_dataset.object_table_taxi_image_inference`,\n", " 'vector_embedding',\n", " (SELECT *\n", " FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,\n", " (SELECT 'cell' AS content ),\n", " STRUCT(TRUE AS flatten_json_output))),\n", " 'text_embedding',\n", " top_k => 2,\n", " distance_type => 'EUCLIDEAN' -- or COSINE\n", " )\n", " WHERE base.metadata[0].value = 'Bronx'\n", " ORDER BY distance\"\"\"\n", "\n", "image_df = runQuery(sql)\n", "\n", "for index, row in image_df.iterrows():\n", " uri = row['uri']\n", " borough = row['borough']\n", " downloaded_filename = downloadGCSFile(uri)\n", " print(f\"uri: {uri}\")\n", " print(f\"borough: {borough}\")\n", " img = Image.open(downloaded_filename)\n", " img.thumbnail([400,711])\n", " IPython.display.display(img)\n", " print()" ] } ], "metadata": { "colab": { "collapsed_sections": [ "dRw4v7zx1AtJ", "LfKtshOJvrxH", "esyact5NoYA9", "krmZ78BCsHoK", "N0LGUyIvsTdc", "1RVMxCRvsWp2", "Mrz1_eXFsa6t", "TzC5lSWHsfJe", "yfW_BQWx7MDJ", "HEiOCWFfYnzO", "7GMwE_UW6A9w", "gkY0KBSXJ7eo", "kr59LuJVKJlG", "qm9PgMEntbAW", "_C29InxBshzt", "euYNMf6wsmYx", "eIIE1CXLxOT0", "xBgEhz9A4aim", "xI20OyYSIi-W", "xoUo1wbGJ-tM", "bLSn2m2LU-A0", "Z4YJik5ymS_u", "mWRp2Wv9nSim", "AX_kVFZ-mViO", "cJUEgVzUyXXu", "LRvgYFTqy3ZK", "Juvg5uULWmkK", "YwXWrHGUWwaX", "mveILgDIXgb6", "F3YuzQWSLH9G", "Ak-9aiLTL4mN", "KAGEnV0qSmWv", "cFbGGfioLKar", "rgIu6307I7Jb" ], "name": "BigLake-Demo", "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }