# <img src="https://lh3.googleusercontent.com/mUTbNK32c_DTSNrhqETT5aQJYFKok2HB1G2nk2MZHvG5bSs0v_lmDm_ArW7rgd6SDGHXo0Ak2uFFU96X6Xd0GQ=w160-h128" width="45" valign="top" alt="BigQuery"> BigLake Demo

## <font color='blue'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

Author: Adam Paternostro

## <font color='blue'>Overview -</font> Readme




* Self Link: https://github.com/GoogleCloudPlatform/data-analytics-golden-demo/blob/main/colab-enterprise/biglake/BigLake-Demo.ipynb
* This notebook will:
  - Create a storage account name: biglake-{your project id}
  - Create external BigQuery connections for BigLake, Vertex AI
  - Create BigLake Managed Tables (BLMT)
    - Loads the tables
    - Show streaming ingestion via BigQuery Subscription
    - Iceberg Metadata export
  - Create BigLake Self-Managed (external Tables) in lots of formats
  - Shows Row, Column and Data Masking on BigLake tables
  - Create a BigLake Metastore using a serverless Spark Stored procedure.  Permissions set with working Spark / Iceberg code.
  - BigLake Materialized Views
  - BigLake Object Tables
    - Image Table
    - Vertex AI image processing
    - Gemini Pro to generate natural language
    - Vector Embeddings
    - Semantic Search



## <font color='gray'>Set Notebook Parameters</font>


In [None]:
# Set these (run this cell to verify the output)
bigquery_location = "us" # or "eu"

# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

# Derived parameters
biglake_bucket_name = "biglake-" + project_id
biglake_connection_name = "biglake-notebook-connection"
spark_connection_name = "spark-notebook-connection"
taxonomy_name = project_id.lower()
vertex_ai_connection_name = "vertex-ai-notebook-connection"

params = { "project_id" : project_id,
           "bigquery_location" : bigquery_location,
           "biglake_connection_name": biglake_connection_name,
           "biglake_bucket_name" : biglake_bucket_name,
           "user" : user,
           "taxonomy_name" : taxonomy_name,
           "spark_connection_name" : spark_connection_name,
           "vertex_ai_connection_name" : vertex_ai_connection_name
           }

## <font color='gray'>Helper Methods</font>
Creates BigLake connection, GCS bucket, set IAM permissions

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### createBigLakeConnection
Creates the BigQuery external connection and returns the generated service principal.  The service principal then needs to be granted IAM access to resourses it requires.

In [None]:
def createBigLakeConnection(params):
  """Creates a BigLake connection."""

  # First find the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]
  biglake_connection_name = params["biglake_connection_name"]
  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections"

  # Gather existing connections
  json_result = restAPIHelper(url, "GET", None)
  print(f"createBigLakeConnection (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "connections" in json_result:
    for item in json_result["connections"]:
      print(f"BigLake Connection: {item['name']}")
      # "projects/756740881369/locations/us/connections/biglake-notebook-connection"
      # NOTE: We cannot test the complete name since it contains the project number and not id
      if item["name"].endswith(f"/locations/{bigquery_location}/connections/{biglake_connection_name}"):
        print("Connection already exists")
        serviceAccountId = item["cloudResource"]["serviceAccountId"]
        return serviceAccountId

  # Create the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create
  print("Creating BigLake Connection")

  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={biglake_connection_name}"

  request_body = {
      "friendlyName": biglake_connection_name,
      "description": "BigLake Colab Notebooks Connection for Data Analytics Golden Demo",
      "cloudResource": {}
  }

  json_result = restAPIHelper(url, "POST", request_body)

  serviceAccountId = json_result["cloudResource"]["serviceAccountId"]
  print("BigLake Connection created: ", serviceAccountId)
  return serviceAccountId


#### createGoogleCloudStorageBucket
Create the Google Cloud Storage bucket that will be used for holding the BigLake files (avro, csv, delta, hudi, json, parquet)

In [None]:
def createGoogleCloudStorageBucket(params):
  """Creates a Google Cloud Storage bucket."""

  # First find the bucket
  # https://cloud.google.com/storage/docs/json_api/v1/buckets/list
  project_id = params["project_id"]
  biglake_bucket_name = params["biglake_bucket_name"]
  url = f"https://storage.googleapis.com/storage/v1/b?project={project_id}"

  # Gather existing buckets
  json_result = restAPIHelper(url, "GET", None)
  print(f"createGoogleCloudStorageBucket (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "items" in json_result:
    for item in json_result["items"]:
      print(f"Bucket Id / Name: ({item['id']} / {item['name']}")
      if item["id"] == biglake_bucket_name:
        print("Bucket already exists")
        return

  # Create the bucket
  # https://cloud.google.com/storage/docs/json_api/v1/buckets/insert
  print("Creating Google Cloud Bucket")

  url = f"https://storage.googleapis.com/storage/v1/b?project={project_id}&predefinedAcl=private&predefinedDefaultObjectAcl=private&projection=noAcl"

  request_body = {
      "name": biglake_bucket_name
  }

  json_result = restAPIHelper(url, "POST", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print("BigLake Bucket created: ", biglake_bucket_name)

#### setBucketIamPolicy
Added the BigLake External Connection Service Principal to the IAM permission of the GCS bucket.

In [None]:
def setBucketIamPolicy(params, accountWithPrefix, role):
  """Sets the bucket IAM policy."""

  biglake_bucket_name = params["biglake_bucket_name"]

  # Get the current bindings (if the account has access then skip)
  # https://cloud.google.com/storage/docs/json_api/v1/buckets/getIamPolicy

  url = f"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/iam"
  json_result = restAPIHelper(url, "GET", None)
  print(f"setBucketIamPolicy (GET) json_result: {json_result}")

  # Test to see if permissions exist
  if "bindings" in json_result:
    for item in json_result["bindings"]:
      members = item["members"]
      for member in members:
        if member == accountWithPrefix:
          existing_role = item["role"]
          if existing_role == role:
            print(f"Permissions exist: {existing_role}")
            return

  # Take the existing bindings and we need to append the new permission
  # Otherwise we loose the existing permissions

  bindings = json_result["bindings"]
  new_permission = {
      "role": role,
      "members": [ accountWithPrefix ]
      }

  bindings.append(new_permission)

  # https://cloud.google.com/storage/docs/json_api/v1/buckets/setIamPolicy
  url = f"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/iam"

  request_body = { "bindings" : bindings }

  print(f"Permission bindings: {bindings}")


  json_result = restAPIHelper(url, "PUT", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print(f"Bucket IAM Permissions set for {accountWithPrefix} {role}")

#### bucketFileExists
Method that tests for a file in the bucket and if the file does not exist then a copy from the public storage account is performed.

In [None]:
def bucketFileExists(params):
  """Test to see if data has been copied"""
  import urllib.parse

  # First find the connection
  # https://cloud.google.com/storage/docs/json_api/v1/objects/get
  project_id = params["project_id"]
  biglake_bucket_name = params["biglake_bucket_name"]
  file_to_test = "biglake-tables/driver_parquet/driver.snappy.parquet"
  file_to_test_encoded = urllib.parse.quote(file_to_test, safe='')
  url = f"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/o/{file_to_test_encoded}"

  # Gather existing connections
  try:
    json_result = restAPIHelper(url, "GET", None)
    print(f"bucketFileExists (GET) json_result: {json_result}")
    return True
  except:
    return False

#### updateHudiManifest
We need to replace the REPLACE_ME path in the Hudi Manifest file. Open the copied manifest, replace each line and then save back to storage.

In [None]:
def updateHudiManifest(params):
  import io
  import os

  from google.cloud import storage

  # Create a storage client
  storage_client = storage.Client()

  # Get a reference to the bucket
  bucket = storage_client.bucket(biglake_bucket_name)
  blob_name = "biglake-tables/location_hudi/.hoodie/absolute-path-manifest/latest-snapshot.csv"

  blob = bucket.blob(blob_name)
  blob.download_to_filename("latest-snapshot.csv")

  with open('latest-snapshot.csv', 'r') as f:
      manifest_lines = f.readlines()

  new_lines = []
  for line in manifest_lines:
      new_lines.append(line.replace("REPLACE-ME",biglake_bucket_name))

  with open('latest-snapshot.csv', 'w') as f:
      f.writelines(new_lines)

  # Upload the file from the local filesystem
  content_type = "text/csv"
  blob.upload_from_filename('latest-snapshot.csv', content_type = content_type)


#### createServiceAccount
Creates a service account if it does not exist

In [None]:
def createServiceAccount(params, serviceAccountName, description, displayName):
  """Creates a Service Account."""

  # First find the service account
  # https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/list
  project_id = params["project_id"]
  url = f"https://iam.googleapis.com/v1/projects/{project_id}/serviceAccounts"

  serviceAccountEmail = f"{serviceAccountName}@{project_id}.iam.gserviceaccount.com"

  # Gather existing service accounts
  json_result = restAPIHelper(url, "GET", None)
  print(f"createServiceAccount (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "accounts" in json_result:
    for item in json_result["accounts"]:
      print(f"email: {item['email']}")
      if item["email"] == serviceAccountEmail:
        print("Service Account already exists")
        return serviceAccountEmail

  # Create the service account
  # https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts/create
  print("Creating Service Account")

  url = f"https://iam.googleapis.com/v1/projects/{project_id}/serviceAccounts"

  request_body = {
      "accountId" : serviceAccountName,
      "serviceAccount":{
          "description": description,
          "displayName": displayName
        }
      }

  json_result = restAPIHelper(url, "POST", request_body)

  email = json_result["email"]
  print("Service Account created: ", email)
  return email

#### getProjectNumber
Gets the project number from a project id

In [None]:
def getProjectNumber(params):
  """Batch activates service apis"""

  if "project_number" not in params:
    # https://cloud.google.com/resource-manager/reference/rest/v1/projects/get?
    project_id = params["project_id"]

    url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}"
    json_result = restAPIHelper(url, "GET", None)
    print(f"setBucketIamPolicy (GET) json_result: {json_result}")

    project_number = json_result["projectNumber"]
    params["project_number"] = project_number
    print(f"getProjectNumber: {project_number}")

  else:
    project_number = params["project_number"]
    print(f"getProjectNumber: {project_number}")
    return project_number

#### activateServiceAPIs
Enables Google Cloud APIs

In [None]:
def activateServiceAPIs(params):
  """Batch activates service apis"""

  project_number = params["project_number"]

  request_body = {
      "serviceIds" : [ "pubsub.googleapis.com", "vision.googleapis.com", "biglake.googleapis.com", "dataproc.googleapis.com", "aiplatform.googleapis.com"]
  }

  url = f"https://serviceusage.googleapis.com/v1/projects/{project_number}/services:batchEnable"
  json_result = restAPIHelper(url, "POST", request_body)
  print(f"activateServiceAPIs (POST) json_result: {json_result}")

#### downloadGCSFile
Downloads a file from GCS to the notebook compute instance

In [None]:
def downloadGCSFile(uri):
  from google.cloud import storage

  bucket_name = uri[5:uri.replace("gs://","").index("/")+5]
  file_path = uri.replace("gs://" + bucket_name,"")[1::]
  filename = file_path[len(file_path) - file_path[::-1].index("/"):]

  # print(f"bucket_name: {bucket_name}")
  # print(f"file_path: {file_path}")
  # print(f"filename: {filename}")

  storage_client = storage.Client()
  bucket = storage_client.bucket(bucket_name)
  blob = bucket.blob(file_path)
  blob.download_to_filename(filename)

  return filename

#### getTableSchema
Retrieve a BigQuery table schema as JSON

In [None]:
def getTableSchema(project_id, dataset_name, table_name):
  import io
  import google.cloud.bigquery as bigquery

  client = bigquery.Client()

  dataset_ref = client.dataset(dataset_name, project=project_id)
  table_ref = dataset_ref.table(table_name)
  table = client.get_table(table_ref)

  f = io.StringIO("")
  client.schema_to_json(table.schema, f)
  return f.getvalue()

#### updateTableSchema
Sets the schema for a BigQuery table (CLS , Data Masking)

In [None]:
def updateTableSchema(project_id, dataset_name, table_name, new_schema):
  import io
  import google.cloud.bigquery as bigquery

  client = bigquery.Client()

  dataset_ref = client.dataset(dataset_name, project=project_id)
  table_ref = dataset_ref.table(table_name)
  table = client.get_table(table_ref)

  table.schema = new_schema
  table = client.update_table(table, ["schema"])

  print(f"Table {table_name} schema updated!")

#### createPubSubSubscription
Creates a Pub/Sub subscription for BQ Subscription

In [None]:
def createPubSubSubscription(params, name, table_name, topic_name):
  """Creates a Pub/Sub Subscription for BQ Subscription."""

  # First find the pub/sub
  # https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/list
  project_id = params["project_id"]
  url = f"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions"

  # Gather existing pub/sub
  json_result = restAPIHelper(url, "GET", None)
  print(f"createPubSubSubscription (GET) json_result: {json_result}")

  full_name = f"projects/{project_id}/subscriptions/{name}"

  # Test to see if connection exists, if so return
  if "subscriptions" in json_result:
    for item in json_result["subscriptions"]:
      print(f"name: {item['name']}")
      if item["name"] == full_name:
        print("Pub/Sub already exists")
        return full_name

  # Create the service account
  # https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create
  print("Creating Pub/Sub Subscription")

  url = f"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/{name}"

  request_body = {
        "bigqueryConfig": {
            "table": table_name,
            "writeMetadata": True
            },
        "topic": topic_name
   }

  json_result = restAPIHelper(url, "PUT", request_body)

  full_name = json_result["name"]
  print("Pub/Sub Subscription created: ", full_name)
  return full_name

#### deletePubSubSubscription
Deletes a Pub/Sub subscription

In [None]:
def deletePubSubSubscription(params, name):
  """Deletes a Pub/Sub Subscription for BQ Subscription."""

  # First find the pub/sub
  # https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/list
  project_id = params["project_id"]
  url = f"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions"

  # Gather existing pub/sub
  json_result = restAPIHelper(url, "GET", None)
  print(f"deletePubSubSubscription (GET) json_result: {json_result}")

  full_name = f"projects/{project_id}/subscriptions/{name}"

  # Test to see if connection exists, if so return
  if "subscriptions" in json_result:
    for item in json_result["subscriptions"]:
      print(f"name: {item['name']}")
      if item["name"] == full_name:
        # Delete the Pub/Sub
        # https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/delete
        print("Deleteing Pub/Sub Subscription")
        url = f"https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/{name}"
        json_result = restAPIHelper(url, "DELETE", None)
        print("Pub/Sub Subscription delete: ", full_name)
        return

  print("Pub/Sub Subscription does not exists")
  return

#### setBigQueryDatasetPolicy
Sets the IAM Permissions on a BigQuery Dataset

In [None]:
def setBigQueryDatasetPolicy(params, dataset_id, account, role):
  """Sets the BigQuery Dataset IAM policy."""

  # Get the current bindings (if the account has access then skip)
  # https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/get
  project_id = params["project_id"]

  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_id}"

  json_result = restAPIHelper(url, "GET", None)
  print(f"setBigQueryDatasetPolicy (GET) json_result: {json_result}")

  # Test to see if permissions exist
  if "access" in json_result:
    for item in json_result["access"]:
      if "userByEmail" in item:
        if item["userByEmail"] == account and item["role"] == role:
          print("Permissions exist")
          return


  # Take the existing bindings and we need to append the new permission
  # Otherwise we loose the existing permissions
  if "access" in json_result:
    access = json_result["access"]
  else:
    access = []

  new_permission = {
      "role": role,
      "userByEmail": account
      }

  access.append(new_permission)

  # https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch
  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_id}"

  request_body = {
      "access" : access
      }

  print(f"Permission bindings: {access}")

  json_result = restAPIHelper(url, "PATCH", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print(f"BigQuery Dataset IAM Permissions set for {account} {role}")

#### runQuery
Executes a BigQuery SQL statement and returns the results for SELECT statements or waits for the job to complete for non-query results.

In [None]:
def runQuery(sql):
  import time
  import google.cloud.bigquery as bigquery

  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      if sql.upper().startswith("CALL"):
        time.sleep(10)
      else:
        time.sleep(1)

      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      return False

#### create_custom_storage_viewer_role
Creates a custom Google Cloud IAM role

In [None]:
def create_custom_storage_viewer_role(project_id, role_id, role_title, role_description, permissions_to_grant):
  """Creates a custom Google Cloud IAM role"""
  import json
  
  #project_id = params["project_id"]
  #role_id = params.get("role_id", "customStorageViewer") # Use provided or default
  #role_title = params.get("role_title", "Custom Storage Bucket Viewer")
  #role_description = params.get("role_description", "Grants permission to view storage bucket metadata.")
  #permissions_to_grant = params.get("permissions", ["storage.buckets.get"])

  full_role_name = f"projects/{project_id}/roles/{role_id}"
  print(f"Checking for custom IAM role: {full_role_name}")

  # 1. Check if the role already exists
  # API Doc: https://cloud.google.com/iam/docs/reference/rest/v1/projects.roles/get
  get_url = f"https://iam.googleapis.com/v1/{full_role_name}"

  try:
    # Attempt to get the role
    existing_role = restAPIHelper(get_url, "GET", None)

    # Check if the helper returned a valid role (not None or an error structure)
    # A real implementation would check the HTTP status code from the helper
    if existing_role and 'name' in existing_role:
        print(f"Custom IAM role '{role_id}' already exists in project '{project_id}'.")
        print(f"Existing role details: {existing_role}")
        return existing_role # Return the existing role details
    else:
      # If restAPIHelper returned None or an error indicator for 'Not Found',
      # we proceed to creation. If it returned something else unexpectedly,
      # that indicates a different problem.
      print(f"Role '{role_id}' not found or GET failed, proceeding to create.")

  except Exception as e:
    # Assuming restAPIHelper raises an exception for HTTP errors like 404
    # A real implementation should specifically check for a 404 status if possible.
    print(f"Caught exception during GET (assuming role not found): {e}")
    print(f"Proceeding to create role '{role_id}'.")
    # Pass through to the creation step

  # 2. Create the role if it doesn't exist
  # API Doc: https://cloud.google.com/iam/docs/reference/rest/v1/projects.roles/create
  print(f"Creating custom IAM role '{role_id}' in project '{project_id}'...")

  create_url = f"https://iam.googleapis.com/v1/projects/{project_id}/roles"

  request_body = {
      "roleId": role_id,
      "role": {
          "title": role_title,
          "description": role_description,
          "includedPermissions": permissions_to_grant,
          "stage": "GA"  # Or "BETA", "ALPHA", "DEPRECATED"
      }
  }

  try:
    json_result = restAPIHelper(create_url, "POST", request_body)
    print("\nCustom IAM Role Creation API call result:")
    print(json.dumps(json_result, indent=2))

    if json_result and 'name' in json_result:
        print(f"\nSuccessfully created custom IAM role: {json_result.get('name')}")
        return json_result
    else:
        print("\nRole creation might have failed. API response did not contain expected 'name' field.")
        # Handle potential errors reported in json_result if your helper provides them
        return None

  except Exception as e:
      print(f"\nError creating custom IAM role: {e}")
      # Handle exceptions raised by restAPIHelper during creation
      return None

#### initialize
Calls the methods to create the external connection, create the GCS bucket, apply IAM permissions and copies the public data.  This method can be re-run as required and does not cause duplication issues.  Each method tests for the existance of items before creating.

In [None]:
def initialize(params):
  """Create the BigLake connection, GCS bucket, set IAM permissions and copies data"""

  # Create the BigLake connection (if not exists)
  bigLakeServiceAccountId = createBigLakeConnection(params)
  print(f"createBigLakeConnection: {bigLakeServiceAccountId}")
  params["bigLakeServiceAccountId"] = bigLakeServiceAccountId

  # Create storage account (if not exists)
  createGoogleCloudStorageBucket(params)

  # Grant access to GCS Bucket for BigLake Connection (if not exists)
  setBucketIamPolicy(params, f"serviceAccount:{bigLakeServiceAccountId}", "roles/storage.objectAdmin")
  setBucketIamPolicy(params, f"user:{user}", "roles/storage.admin")

  # Copy the sample data (if not exists)
  if bucketFileExists(params) is True:
    print("Data has already been copied")
  else:
    print("Data has not been copied, copying now")

    # Copy the data
    # https://cloud.google.com/storage/docs/gsutil/commands/cp

    # See: https://console.cloud.google.com/storage/browser/data-analytics-golden-demo/biglake/v1/biglake-tables
    source_path = "gs://data-analytics-golden-demo/biglake/v1/*"
    dest_path = f"gs://{params['biglake_bucket_name']}/"
    print(f"Copying data from {source_path} to {dest_path}")
    print("This may take a few minutes...")
    !gsutil -m -q cp -r {source_path} {dest_path}
    print("Copy is complete")

    updateHudiManifest(params)
    print("Hudi manifest updated")

  getProjectNumber(params)
  activateServiceAPIs(params)

## <font color='gray'>Initialize BigLake Demo</font>
Creates resources and copies data.  This is re-runable and does not cause duplication of resources.

In [None]:
initialize(params)

In [None]:
%%bigquery --params $params

CREATE SCHEMA IF NOT EXISTS biglake_mt_dataset OPTIONS(location = @bigquery_location);

CREATE SCHEMA IF NOT EXISTS biglake_dataset OPTIONS(location = @bigquery_location);

## <font color='blue'>BigLake Overview -</font> BigLake Managed / Self Managed Tables vs BigQuery Tables

<table>
  <tr>
    <th rowspan="2">Item</th>
    <th colspan="3">BigLake</th>
    <th colspan="2">BigQuery</th>
  </tr>
  <tr>
    <th>Managed Table</th>
    <th>Self Managed Table</th>
    <th>Iceberg Tables via BigLake Metastore</th>
    <th>Managed Table (Internal / Native)</th>
    <th>External Table</th>
  </tr>
  <tr>
    <td>Storage Format</td>
    <td>Iceberg</td>
    <td>CSV, Delta, Hudi, Iceberg, Parquet, etc.</td>
    <td>Iceberg</td>
    <td>Capacitor</td>
    <td>CSV,ORC, Parquet, etc.</td>
  </tr>
  <tr>
    <td>Storage Location</td>
    <td>Customer GCS</td>
    <td>Customer GCS</td>
    <td>Customer GCS</td>
    <td>Google Internal</td>
    <td>Customer GCS</td>
  </tr>
  <tr>
    <td>Read/Write</td>
    <td>CRUD</td>
    <td>Read only from BQ / Updates via Spark</td>
    <td>Read only from BQ / Updates via Spark</td>
    <td>CRUD</td>
    <td>Read only</td>
  </tr>
  <tr>
    <td>RLS / CLS / Data Masking</td>
    <td>Yes</td>
    <td>Yes</td>
    <td>Yes</td>
    <td>Yes</td>
    <td>No</td>
  </tr>
  <tr>
    <td>Fully Managed</td>
    <td>Yes (recluster, optimize, etc.)</td>
    <td>No</td>
    <td>No</td>
    <td>Yes (recluster, optimize, etc.)</td>
    <td>No</td>
  </tr>
  <tr>
    <td>Partitioning</td>
    <td>Clustering</td>
    <td>Partition</td>
    <td>Partition</td>
    <td>Partition/Clustering</td>
    <td>Partition</td>
  </tr>
  <tr>
    <td>Streaming (native)</td>
    <td>Yes</td>
    <td>No</td>
    <td>No</td>
    <td>Yes</td>
    <td>No</td>
  </tr>
  <tr>
    <td>Time Travel</td>
    <td>Yes</td>
    <td>Manual</td>
    <td>No</td>
    <td>Yes</td>
    <td>No</td>
  </tr>   

</table>

## <font color='blue'>BigLake Managed Tables -</font> Fully managed open source formats (Apache Iceberg)

BigLake managed tables offer the fully managed experience of BigQuery tables while storing data in customer-owned Cloud Storage buckets using open file formats. BigLake managed tables support DML, streaming, and background storage optimizations such as clustering and adaptive file-sizing. BigLake managed tables are compatible with open-source engines like Spark through Apache Iceberg metadata snapshots.

#### <font color="#4285f4">BigLake - Create Managed Tables</font>
- Tables are created in a specified storage account
- Tables are clustered

In [None]:
sql = f"""

CREATE OR REPLACE TABLE `{project_id}.biglake_mt_dataset.driver`
(
  driver_id                 INT64,
  driver_name               STRING,
  driver_mobile_number      STRING,
  driver_license_number     STRING,
  driver_email_address      STRING,
  driver_dob                DATE,
  driver_ach_routing_number STRING,
  driver_ach_account_number STRING
)
CLUSTER BY driver_id
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
  file_format = 'PARQUET',
  table_format = 'ICEBERG',
  storage_uri = 'gs://{biglake_bucket_name}/biglake-managed-tables/driver'
);
"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_mt_dataset.driver **")
  print()
else:
  print()
  print("Table creation failed")
  print()

In [None]:
sql = f"""

CREATE OR REPLACE TABLE `{project_id}.biglake_mt_dataset.location`
(
  location_id  INT64,
  zone         STRING,
  service_zone STRING,
  latitude     FLOAT64,
  longitude    FLOAT64,
  borough      STRING
)
CLUSTER BY location_id
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
  file_format = 'PARQUET',
  table_format = 'ICEBERG',
  storage_uri = 'gs://{biglake_bucket_name}/biglake-managed-tables/location'
);
"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_mt_dataset.location **")
  print()
else:
  print()
  print("Table creation failed")
  print()

In [None]:
sql = f"""

CREATE OR REPLACE TABLE `{project_id}.biglake_mt_dataset.taxi_trips`
(
  trip_id               INT64,
  driver_id             INT64,
  pickup_location_id    INT64,
  pickup_datetime       TIMESTAMP,
  dropoff_location_id   INT64,
  dropoff_datetime      TIMESTAMP,
  passenger_count       INT64,
  trip_distance         FLOAT64,
  fare_amount           FLOAT64,
  surcharge             FLOAT64,
  mta_tax               FLOAT64,
  tip_amount            FLOAT64,
  tolls_amount          FLOAT64,
  ehail_fee             FLOAT64,
  improvement_surcharge FLOAT64,
  total_amount          FLOAT64
)
CLUSTER BY pickup_datetime, pickup_location_id, driver_id
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
  file_format = 'PARQUET',
  table_format = 'ICEBERG',
  storage_uri = 'gs://{biglake_bucket_name}/biglake-managed-tables/taxi_trips'
);
"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_mt_dataset.taxi_trips **")
  print()
else:
  print()
  print("Table creation failed")
  print()

#### <font color="#4285f4">BigLake - Load Managed Tables</font>


In [None]:
%%bigquery

LOAD DATA INTO `biglake_mt_dataset.driver`
FROM FILES (
  format = 'parquet',
  uris = ['gs://data-analytics-golden-demo/biglake/v1-source/managed-table-source/driver/*.parquet']);

In [None]:
%%bigquery

LOAD DATA INTO `biglake_mt_dataset.location`
FROM FILES (
  format = 'parquet',
  uris = ['gs://data-analytics-golden-demo/biglake/v1-source/managed-table-source/location/*.parquet']);

In [None]:
%%bigquery

LOAD DATA INTO `biglake_mt_dataset.taxi_trips`
FROM FILES (
  format = 'parquet',
  uris = ['gs://data-analytics-golden-demo/biglake/v1-source/managed-table-source/taxi_trips/*.parquet']);

In [None]:
print(f"View the files on the storage account:")
print(f"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/biglake-managed-tables/taxi_trips")

#### <font color="#4285f4">BigLake - Query Managed Tables</font>


In [None]:
%%bigquery

-- See amount of data loaded (we can load 45 million + records quickly into Iceberg)

SELECT 'driver' AS table_name, FORMAT("%'d",COUNT(*)) AS record_count FROM `biglake_mt_dataset.driver`
UNION ALL
SELECT 'location' AS table_name, FORMAT("%'d",COUNT(*)) AS record_count FROM `biglake_mt_dataset.location`
UNION ALL
SELECT 'taxi_trips' AS table_name, FORMAT("%'d",COUNT(*)) AS record_count FROM `biglake_mt_dataset.taxi_trips`;

In [None]:
%%bigquery

-- Total and Average tip amount per Driver

SELECT driver.driver_name,
       SUM(trips.tip_amount) AS total_tip_amount,
       FORMAT("%.*f",2,AVG(trips.tip_amount)) AS avg_tip_amount,
  FROM `biglake_mt_dataset.taxi_trips` AS trips
       INNER JOIN `biglake_mt_dataset.driver` AS driver
       ON trips.driver_id = driver.driver_id
       AND driver.driver_id BETWEEN 1 AND 10
 GROUP BY ALL
 ORDER BY 1
 LIMIT 25;

In [None]:
%%bigquery

-- Total and Average tip amount per Zone

SELECT pickup_location.borough AS pickup_location_borough,
       pickup_location.zone AS pickup_location_zone,

       dropoff_location.borough AS dropoff_location_borough,
       dropoff_location.zone AS dropoff_location_zone,

       FORMAT("%.*f",2,AVG(trips.passenger_count)) AS avg_passenger_count,
       FORMAT("%.*f",2,AVG(trips.fare_amount)) AS avg_fare_amount

  FROM `biglake_mt_dataset.taxi_trips` AS trips
       INNER JOIN `biglake_mt_dataset.driver` AS driver
       ON trips.driver_id = driver.driver_id
       INNER JOIN `biglake_mt_dataset.location` AS pickup_location
       ON trips.pickup_location_id = pickup_location.location_id
       INNER JOIN `biglake_mt_dataset.location` AS dropoff_location
       ON trips.dropoff_location_id = dropoff_location.location_id
 GROUP BY ALL
 ORDER BY 1,2,3,4
 LIMIT 25;

#### <font color="#4285f4">BigLake - CRUD Managed Tables</font>


In [None]:
%%bigquery

-- Create a new driver with some random data

INSERT INTO `biglake_mt_dataset.driver`
       (driver_id, driver_name, driver_mobile_number, driver_license_number, driver_email_address,
        driver_dob, driver_ach_routing_number, driver_ach_account_number)
VALUES (999999, 'BigLake Managed Driver',
       CAST(CONCAT(CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING),'-',
              CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING),'-',
              CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) AS STRING),
       CAST(CONCAT(CAST(CAST(ROUND(10 + RAND() * (99 - 10)) AS INT) AS STRING),'-',
              CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING),'-',
              CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-',
              CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) AS STRING),
      CAST(CONCAT(LOWER(REPLACE('BigLake Managed Driver',' ','.')),'@gmail.com') AS STRING),
      CAST(DATE_SUB(CURRENT_DATE(), INTERVAL CAST(ROUND(6570 + RAND() * (24820 - 6570)) AS INT) DAY) AS DATE),
      CAST(CAST(ROUND(100000000 + RAND() * (999999999 - 100000000)) AS INT) AS STRING),
      CAST(CAST(ROUND(100000000 + RAND() * (999999999 - 100000000)) AS INT) AS STRING));

In [None]:
%%bigquery

-- Add a new column and populate it with data

ALTER TABLE `biglake_mt_dataset.driver`
  ADD COLUMN IF NOT EXISTS license_plate STRING;

UPDATE `biglake_mt_dataset.driver`
   SET license_plate = CAST(CONCAT(CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING),'-',
                                   CAST(CAST(ROUND(100 + RAND() * (999 - 1000)) AS INT) AS STRING)) AS STRING)
 WHERE TRUE;

In [None]:
%%bigquery

-- See the new driver and the new column

SELECT driver_id, driver_name, license_plate
  FROM `biglake_mt_dataset.driver`
 WHERE driver_id > 9990
 ORDER BY driver_id DESC;

In [None]:
%%bigquery

-- Delete the new driver

DELETE FROM `biglake_mt_dataset.driver`
 WHERE driver_id = 999999;

#### <font color="#4285f4">BigLake - Managed Tables</font> - Streaming Ingestion
BigLake Managed Tables support streaming ingestion of data.  In this example Pub/Sub will be used to stream data directly into a managed table.

In [None]:
# Create a new table for streaming ingestion

sql = f"""

CREATE TABLE IF NOT EXISTS `{project_id}.biglake_mt_dataset.taxi_trips_streaming`
(
  subscription_name STRING,
  message_id STRING,
  publish_time TIMESTAMP,
  data STRING,
  attributes STRING
)
CLUSTER BY publish_time
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
  file_format = 'PARQUET',
  table_format = 'ICEBERG',
  storage_uri = 'gs://{biglake_bucket_name}/biglake-managed-tables/taxi_trips_streaming'
);
"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_mt_dataset.taxi_trips_streaming **")
  print()
else:
  print()
  print("Table creation failed")
  print()

In [None]:
# We need bucket get for streaming to work

role_id = "biglake_bucket_get_role"
role_title = "BigLake-Bucket-Get-Role"
role_description = "Required for streaming ingestion to Iceberg"
permissions_to_grant = ["storage.buckets.get"] 
create_custom_storage_viewer_role(project_id, role_id, role_title, role_description, permissions_to_grant)

In [None]:
# Grant our external connection service principal bucket.get role
full_role_name = f"projects/{project_id}/roles/{role_id}"
setBucketIamPolicy(params, f"serviceAccount:{ params['bigLakeServiceAccountId']}", full_role_name)

In [None]:
# Grants the Pub/Sub default service account permissions to the BigLake MT Dataset
# This will allow it to stream the data into the table

project_number = getProjectNumber(params)
print(f"project_number: {project_number}")
pubSubServiceAccountEmail = f"service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com"

print(f"pubSubServiceAccountEmail: {pubSubServiceAccountEmail}")
setBigQueryDatasetPolicy(params, "biglake_mt_dataset", f"{pubSubServiceAccountEmail}", "OWNER")

<font color="red">WARNING: If you create this Pub/Sub subscription, please make sure you DELETE it (2 cells down).</font>

In [None]:
# Create a Pub/Sub subscription that will steam data into the table

project_id = params["project_id"]
table_name = f"{project_id}.biglake_mt_dataset.taxi_trips_streaming"
topic_name = f"projects/pubsub-public-data/topics/taxirides-realtime"

createPubSubSubscription(params, "biglake-mt-streaming", table_name, topic_name)

print()
print(f"To view Pub/Sub: https://console.cloud.google.com/cloudpubsub/subscription/detail/biglake-mt-streaming")


In [None]:
%%bigquery

SELECT message_id, publish_time, data
  FROM `biglake_mt_dataset.taxi_trips_streaming`
  --WHERE publish_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
  LIMIT 10

In [None]:
%%bigquery

-- Query the streaming data
WITH streaming_data AS (
SELECT message_id, publish_time, data
  FROM `biglake_mt_dataset.taxi_trips_streaming`
  WHERE publish_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
  LIMIT 10
)
, streaming_json AS (
SELECT message_id, publish_time, PARSE_JSON(data) as trip_json
  FROM streaming_data
)
SELECT message_id, publish_time, trip_json.ride_id,
       trip_json.latitude, trip_json.longitude,
       trip_json.meter_reading,
       trip_json.ride_status,
       trip_json.passenger_count
  FROM streaming_json;

In [None]:
%%bigquery

-- See the counts increasing (run several times in a row)
SELECT FORMAT("%'d",COUNT(*)) AS record_count
  FROM `biglake_mt_dataset.taxi_trips_streaming`
  WHERE publish_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR);

<font color="red">Removes the Pub/Sub Subcriptions - Stops Billing!</font>

In [None]:
deletePubSubSubscription(params, "biglake-mt-streaming")

print()
print(f"Please VERIFY that Pub/Sub has been removed: https://console.cloud.google.com/cloudpubsub/subscription")

#### <font color="#4285f4">BigLake - Managed Tables</font> - Metadata Export
Manually generate an Iceberg snapshot

In [None]:
print(f"View the metadata BEFORE the EXPORT:")
print(f"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/biglake-managed-tables/driver/metadata")

In [None]:
%%bigquery
EXPORT TABLE METADATA FROM biglake_mt_dataset.driver

In [None]:
print(f"View the metadata AFTER the EXPORT:")
print(f"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/biglake-managed-tables/driver/metadata")

## <font color='blue'>BigLake Self Managed Tables -</font> Support for open source formats
BigLake support a variety of formats.  Here we will show Avro, Csv, Delta.io, Hudi, Json and Parquet.  There are additional supported formats as well.  Apache Iceberg will be shown in its own area of this notebook.

#### <font color="#4285f4">BigLake - Avro</font>


In [None]:
sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.payment_type_avro`
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
    format = "AVRO",
    enable_logical_types = true,
    uris = ['gs://{biglake_bucket_name}/biglake-tables/payment_type_table_avro/*.avro']
);

"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.payment_type_avro **")
  print()
else:
  print()
  print("Table creation failed")
  print()

#### <font color="#4285f4">BigLake - CSV</font>


In [None]:
sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.rate_code_csv`
(
  Rate_Code_Id	INTEGER,
  Rate_Code_Description	STRING
)
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
    format = "CSV",
    uris = ['gs://{biglake_bucket_name}/biglake-tables/rate_code_table_csv/*.csv']
);

"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.rate_code_csv **")
  print()
else:
  print()
  print("Table creation failed")
  print()

#### <font color="#4285f4">BigLake - Delta.io (Delta Lake)</font>


In [None]:
sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.vendor_delta_io`
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
    format = "DELTA_LAKE",
    uris = ['gs://{biglake_bucket_name}/biglake-tables/vendor_delta_io']
);

"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.vendor_delta_io **")
  print()
else:
  print()
  print("Table creation failed")
  print()

#### <font color="#4285f4">BigLake - Hudi</font>
- PySpark source code that created the Hudi table: [GitHub](https://github.com/GoogleCloudPlatform/data-analytics-golden-demo/blob/main/dataproc/pyspark_apache_hudi.py)
- This uses the [Hudi-BigQuery connector](https://github.com/apache/hudi/blob/master/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java)


In [None]:
sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.location_hudi`
WITH PARTITION COLUMNS
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
    format="PARQUET",
    uris=["gs://{biglake_bucket_name}/biglake-tables/location_hudi/.hoodie/absolute-path-manifest/*"],
    file_set_spec_type = 'NEW_LINE_DELIMITED_MANIFEST',
    hive_partition_uri_prefix = "gs://{biglake_bucket_name}/biglake-tables/location_hudi/",
    max_staleness = INTERVAL 30 MINUTE,
    metadata_cache_mode = 'MANUAL'
);

CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('biglake_dataset.location_hudi_TEST');
"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.location_hudi **")
  print()
else:
  print()
  print("Table creation failed")
  print()

#### <font color="#4285f4">BigLake - Json</font>


In [None]:
sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.trip_type_json`
(
  Trip_Type_Id	INTEGER,
  Trip_Type_Description	STRING
)
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
    format = "JSON",
    uris = ['gs://{biglake_bucket_name}/biglake-tables/trip_type_json/*.json']
);

"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.trip_type_json **")
  print()
else:
  print()
  print("Table creation failed")
  print()

#### <font color="#4285f4">BigLake - Parquet w/Hive Partitioning and Metadata Caching</font>
- BigLake supports Hive partitioned files
- BigLake supports Metadata caching to boost performance

In [None]:
sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.taxi_trips_parquet`
WITH PARTITION COLUMNS (
    year  INTEGER, -- column order must match the external path
    month INTEGER
)
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
    format = "PARQUET",
    hive_partition_uri_prefix = "gs://{biglake_bucket_name}/biglake-tables/taxi_trips_parquet/",
    uris = ['gs://{biglake_bucket_name}/biglake-tables/taxi_trips_parquet/*.parquet'],
    max_staleness=INTERVAL 30 MINUTE,
    metadata_cache_mode="MANUAL" -- This can be setup to be 30 minutes or more
);

"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.taxi_trips_parquet **")
  print()
else:
  print()
  print("Table creation failed")
  print()

# Refresh can only be done for "manual" cache mode.  This is done since this is a demo.
sql = f"CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('{project_id}.biglake_dataset.taxi_trips_parquet')"

if runQuery(sql) == True:
  print()
  print(f"** Refreshed Metadata on {project_id}.biglake_dataset.taxi_trips_parquet **")
  print()
else:
  print()
  print("Metadata refresh failed")
  print()


## <font color='blue'>BigLake SQL -</font> Query tables in all formats


### <font color="#4285f4">SQL - Query each format</font>


In [None]:
%%bigquery
SELECT *
  FROM biglake_dataset.payment_type_avro;

In [None]:
%%bigquery
SELECT *
  FROM biglake_dataset.location_hudi
LIMIT 25;

In [None]:
%%bigquery
SELECT *
  FROM biglake_dataset.vendor_delta_io;

### <font color="#4285f4">SQL - Join all the different formats</font>
We can join all the data from all the differet formats (parquet, hudi, delta, csv, json, avro).

In [None]:
%%bigquery
SELECT location.borough AS pickup_borough,
       location.zone AS pickup_zone,
       payment_type.payment_type_description,
       rate_code.rate_code_description,
       trip_type.trip_type_description,
       vendor.vendor_description,
       SUM(taxi_trips.fare_amount) AS fare_amount,
       SUM(taxi_trips.total_amount) AS total_amount,
       COUNT(*) AS number_of_trips
  FROM biglake_dataset.taxi_trips_parquet AS taxi_trips
       INNER JOIN biglake_dataset.location_hudi AS location
               ON taxi_trips.PULocationID = location.location_id
       INNER JOIN biglake_dataset.payment_type_avro AS payment_type
               ON taxi_trips.payment_type_id = payment_type.payment_type_id
       INNER JOIN biglake_dataset.rate_code_csv AS rate_code
               ON taxi_trips.rate_code_id = rate_code.rate_code_id
       INNER JOIN biglake_dataset.trip_type_json AS trip_type
               ON taxi_trips.trip_type = trip_type.trip_type_id
       INNER JOIN biglake_dataset.vendor_delta_io AS vendor
               ON taxi_trips.vendor_id = vendor.vendor_id
GROUP BY ALL
ORDER BY 1, 2, 3
LIMIT 100;

## <font color='blue'>BigLake Security / Goverance -</font> Row, Column, Data Masking
BigLake supports IAM, Row Level, Column Level and Data Masking security

### <font color='gray'>Helper Functions - CLS / Data Masking</font>
Calls the REST API to create taxonomy, policies, data policies and set IAM on policies and data policies.



#### createTaxonomy
Creates the top level Taxonomy

In [None]:
def createTaxonomy(params):
  """Creates a Taxonomy."""

  # First find the connection
  # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies/list
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]
  taxonomy_name = params["taxonomy_name"]
  url = f"https://datacatalog.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/taxonomies"


  # Gather existing connections
  json_result = restAPIHelper(url, "GET", None)
  print(f"createTaxonomy (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "taxonomies" in json_result:
    for item in json_result["taxonomies"]:
      print(f"displayName: {item['displayName']}")
      # "projects/test/locations/us/taxonomies/2620666826070342226"
      # NOTE: We cannot test the complete name since it contains the an unknown number
      if item["displayName"] == taxonomy_name:
        print("Taxonomy already exists")
        name = item["name"]
        return name

  # Create the taxonomy
  # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies/create
  print("Creating Taxonomy")

  url = f"https://datacatalog.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/taxonomies"

  request_body = {
      "displayName": taxonomy_name,
      "description": "BigLake Demo - Colab Notebook",
  }

  json_result = restAPIHelper(url, "POST", request_body)

  name = json_result["name"]
  print("Taxonomy created: ", name)
  return name

#### createPolicyTag
Creates a Taxonomy Policy Tag or Child Policy Tag

In [None]:
def createPolicyTag(params, taxonomy_name, policy_parent, policy_name):
  """Creates Taxonomy Policy Tag or Sub-Policy Tag"""

  # First find the connection
  # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies/list
  project_id = params["project_id"]


  url = f"https://datacatalog.googleapis.com/v1/{taxonomy_name}/policyTags"

  # Gather existing connections
  json_result = restAPIHelper(url, "GET", None)
  print(f"createTaxonomyPolicyTags (GET) json_result: {json_result}")

  # Test to see if connection exists, if so returns
  if "policyTags" in json_result:
    for item in json_result["policyTags"]:
      # print(f"displayName: {item['displayName']}")
      # "projects/test/locations/us/taxonomies/2620666826070342226"
      # NOTE: We cannot test the complete name since it contains the an unknown number
      if item["displayName"] == policy_name:
        print(f"{policy_name} already exists")
        return item["name"]


  # Create the taxonomy (High)
  # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies.policyTags/create
  print(f"Creating Policy {policy_name}")

  url = f"https://datacatalog.googleapis.com/v1/{taxonomy_name}/policyTags"

  if policy_parent is  None:
    request_body = {
        "displayName": policy_name,
        "description": "BigLake Demo - Colab Notebook - " + policy_name,
    }
  else:
    request_body = {
        "parentPolicyTag" : policy_parent,
        "displayName": policy_name,
        "description": "BigLake Demo - Colab Notebook - " + policy_name,
    }

  json_result = restAPIHelper(url, "POST", request_body)

  policy_full_name = json_result["name"]
  print("Policy created: ", policy_full_name)

  return policy_full_name

#### securePolicyTag
Secures a policy (column level security)

In [None]:
def securePolicyTag(params, policy_name):
  """Secure a Policy."""

  # First find the IAM Permission
  # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies.policyTags/getIamPolicy
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]

  url = f"https://datacatalog.googleapis.com/v1/{policy_name}:getIamPolicy"

  # Gather existing data policies
  request_body = { }
  json_result = restAPIHelper(url, "POST", request_body)
  print(f"getIamPolicy (POST) json_result: {json_result}")

  # Test for existance
  if "bindings" in json_result:
    for item in json_result["bindings"]:
      print(f"role: {item['role']}")
      for member in item["members"]:
        print(f"member: {member}")
        if member == "user:" + params["user"]:
          print("securePolicyTag: Permissions exist")
          return

  # Set IAM
  # https://cloud.google.com/data-catalog/docs/reference/rest/v1/projects.locations.taxonomies.policyTags/setIamPolicy
  url = f"https://datacatalog.googleapis.com/v1/{policy_name}:setIamPolicy"

  request_body = {
        "policy": {
            "bindings":[
                {
                    "members": [ "user:" + params["user"] ],
                    "role":"roles/datacatalog.categoryFineGrainedReader"
                    }
                ]
            }
        }

  json_result = restAPIHelper(url, "POST", request_body)
  print("IAM Security Set: ", policy_name)

#### createDataPolicy
Create a data masking policy

In [None]:
def createDataPolicy(params, policyTag, policy_name, dataPolicyType, predefinedExpression):
  """Creates a Data Policy."""

  # First find the connection
  # https://cloud.google.com/bigquery/docs/reference/bigquerydatapolicy/rest/v1/projects.locations.dataPolicies/list?
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]

  url = f"https://bigquerydatapolicy.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/dataPolicies"

  # Gather existing data policies
  json_result = restAPIHelper(url, "GET", None)
  print(f"createDataPolicies (GET) json_result: {json_result}")

  # Test for policy_name
  if "dataPolicies" in json_result:
    for item in json_result["dataPolicies"]:
      # print(f"name: {item['name']}")
      if item["name"] == f"projects/{project_id}/locations/{bigquery_location}/dataPolicies/{policy_name}":
        print(f"createDataPolicy policy exists: {policy_name}")
        return item["name"]

  # Create Data Policy
  # https://cloud.google.com/bigquery/docs/reference/bigquerydatapolicy/rest/v1/projects.locations.dataPolicies/create
  url = f"https://bigquerydatapolicy.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/dataPolicies"

  # Create
  print(f"Creating Data Policy {policy_name}")

  request_body = {
      "dataPolicyId": policy_name,
      "dataPolicyType": dataPolicyType,
      "policyTag" : policyTag,
      "dataMaskingPolicy": {
          "predefinedExpression": predefinedExpression
          }
  }

  json_result = restAPIHelper(url, "POST", request_body)

  policy_name = json_result["name"]
  print("Data Policy created: ", policy_name)

  return policy_name

#### secureDataPolicy
Secures a data policy (masking) tag

In [None]:
def secureDataPolicy(params, data_policy_name):
  """Secure a Data Policy."""

  # First find the IAM Permission
  # https://cloud.google.com/bigquery/docs/reference/bigquerydatapolicy/rest/v1/projects.locations.dataPolicies/getIamPolicy
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]

  url = f"https://bigquerydatapolicy.googleapis.com/v1/{data_policy_name}:getIamPolicy"

  # Gather existing data policies
  request_body = { }
  json_result = restAPIHelper(url, "POST", request_body)
  print(f"getIamPolicy (POST) json_result: {json_result}")

  # Test for existance
  if "bindings" in json_result:
    for item in json_result["bindings"]:
      print(f"role: {item['role']}") # I should check the role here too "roles/bigquerydatapolicy.maskedReader"
      for member in item["members"]:
        print(f"member: {member}")
        if member == "user:" + params["user"]:
          print("secureDataPolicy: Permissions exist")
          return

  # Set IAM
  # https://cloud.google.com/bigquery/docs/reference/bigquerydatapolicy/rest/v1/projects.locations.dataPolicies/setIamPolicy
  url = f"https://bigquerydatapolicy.googleapis.com/v1/{data_policy_name}:setIamPolicy"

  request_body = {
        "policy": {
            "bindings":[
                {
                    "members": [ "user:" + params["user"] ],
                    "role": "roles/bigquerydatapolicy.maskedReader"
                    }
                ]
            }
        }

  json_result = restAPIHelper(url, "POST", request_body)
  print("IAM Security Set: ", data_policy_name)

### <font color='gray'>Initailize Data Security - CLS / Data Masking</font>
Creates the Taxonomy and Data Masking Rules.  This creates two top level Taxonomies (High and Low Security Clearance).  Then for each type of data a Policy is created.  Security is then granted to the policy which would enforce columns level permissions.  For other fields, a data masking rule is created under the policy and then security is grated.
- Best Practices: https://cloud.google.com/bigquery/docs/best-practices-policy-tags


<img src="https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-CLS-Data-Mask.png" width="800" height="232" valign="top" alt="BigLake Table Column / Data Masking">


In [None]:
################################################################################
# Create Hierarchical Data Policies
################################################################################
# Overview
# 1 - Create the overall Taxonomy (createPolicyTag)
#     - high_security_clearance

# 2 - Create each Policy
#     - phone_number
#     - government_identification
#     - email_address
#     - date_of_birth
#     - bank_account_routing
#     - bank_account_number

# 3 - Secure Policies without Data Masking (securePolicyTag)
#     roles/datacatalog.categoryFineGrainedReader
#       - phone_number
#       - email_address

# 4 - Create Data Masking (createDataPolicy)
#     - government_identification,LAST_FOUR_CHARACTERS
#     - date_of_birth,DATE_YEAR_MASK
#     - bank_account_routing,FIRST_FOUR_CHARACTERS
#     - bank_account_number,LAST_FOUR_CHARACTERS

# 5 - Secure Data Policy (secureDataPolicy)
#     roles/bigquerydatapolicy.maskedReader
#       - government_identification
#       - date_of_birth
#       - bank_account_routing
#       - bank_account_number


################################################################################
# To see the Taxonomy open this link in a new tab: https://console.cloud.google.com/bigquery/policy-tags
################################################################################
taxonomy_name = createTaxonomy(params)
print(f"taxonomy_name: {taxonomy_name}")


################################################################################
# High Security Clearance
################################################################################
policy_high_security_clearance = createPolicyTag(params, taxonomy_name, None, "high_security_clearance")
print(f"policy_high_security_clearance: {policy_high_security_clearance}")

## High Security Clearance -> Phone Number
policy_high_security_clearance_phone_number = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, "hsc_pt_phone_number")
print(f"policy_high_security_clearance_phone_number: {policy_high_security_clearance_phone_number}")
## No need to create data mask for phone_number, instead we are granting access to the column (CLS)
securePolicyTag(params, policy_high_security_clearance_phone_number)

## High Security Clearance -> Government Identification
policy_high_security_clearance_government_identification = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, "hsc_pt_government_identification")
print(f"policy_high_security_clearance_government_identification: {policy_high_security_clearance_government_identification}")
datamask_policy_high_security_clearance_government_identification = createDataPolicy(params, policy_high_security_clearance_government_identification, "hsc_dm_government_identification", "DATA_MASKING_POLICY", "LAST_FOUR_CHARACTERS")
secureDataPolicy(params, datamask_policy_high_security_clearance_government_identification)

## High Security Clearance -> Email Address
policy_high_security_clearance_email_address = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, "hsc_pt_email_address")
print(f"policy_high_security_clearance_email_address: {policy_high_security_clearance_email_address}")
## No need to create data mask for email_address, instead we are granting access to the column (CLS)
securePolicyTag(params, policy_high_security_clearance_email_address)

## High Security Clearance -> Date of Birth
policy_high_security_clearance_date_of_birth = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, "hsc_pt_date_of_birth")
print(f"policy_high_security_clearance_date_of_birth: {policy_high_security_clearance_date_of_birth}")
datamask_policy_high_security_clearance_date_of_birth = createDataPolicy(params, policy_high_security_clearance_date_of_birth, "hsc_dm_date_of_birth", "DATA_MASKING_POLICY", "DATE_YEAR_MASK")
secureDataPolicy(params, datamask_policy_high_security_clearance_date_of_birth)

## High Security Clearance -> Bank Account Routing
policy_high_security_clearance_bank_account_routing = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, "hsc_pt_bank_account_routing")
print(f"policy_high_security_clearance_bank_account_routing: {policy_high_security_clearance_bank_account_routing}")
datamask_policy_high_security_clearance_bank_account_routing = createDataPolicy(params, policy_high_security_clearance_bank_account_routing, "hsc_dm_bank_account_routing", "DATA_MASKING_POLICY", "FIRST_FOUR_CHARACTERS")
secureDataPolicy(params, datamask_policy_high_security_clearance_bank_account_routing)

## High Security Clearance -> Bank Account Number
policy_high_security_clearance_bank_account_number = createPolicyTag(params, taxonomy_name, policy_high_security_clearance, "hsc_pt_bank_account_number")
print(f"policy_high_security_clearance_bank_account_number: {policy_high_security_clearance_bank_account_number}")
datamask_policy_high_security_clearance_bank_account_number = createDataPolicy(params, policy_high_security_clearance_bank_account_number, "hsc_dm_bank_account_number", "DATA_MASKING_POLICY", "LAST_FOUR_CHARACTERS")
secureDataPolicy(params, datamask_policy_high_security_clearance_bank_account_number)



################################################################################
# Low Security Clearance
################################################################################
policy_low_security_clearance = createPolicyTag(params, taxonomy_name, None, "low_security_clearance")
print(f"policy_low_security_clearance: {policy_low_security_clearance}")

## Low Security Clearance -> Phone Number
policy_low_security_clearance_phone_number = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, "lsc_pt_phone_number")
print(f"policy_low_security_clearance_phone_number: {policy_low_security_clearance_phone_number}")
datamask_policy_low_security_clearance_phone_number = createDataPolicy(params, policy_low_security_clearance_phone_number, "lsc_dm_phone_number", "DATA_MASKING_POLICY", "LAST_FOUR_CHARACTERS")
secureDataPolicy(params, datamask_policy_low_security_clearance_phone_number)

## Low Security Clearance -> Government Identification
policy_low_security_clearance_government_identification = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, "lsc_pt_government_identification")
print(f"policy_low_security_clearance_government_identification: {policy_low_security_clearance_government_identification}")
datamask_policy_low_security_clearance_government_identification = createDataPolicy(params, policy_low_security_clearance_government_identification, "lsc_dm_government_identification", "DATA_MASKING_POLICY", "SHA256")
secureDataPolicy(params, datamask_policy_low_security_clearance_government_identification)

## Low Security Clearance -> Email Address
policy_low_security_clearance_email_address = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, "lsc_pt_email_address")
print(f"policy_low_security_clearance_email_address: {policy_low_security_clearance_email_address}")
datamask_policy_low_security_clearance_email_address = createDataPolicy(params, policy_low_security_clearance_email_address, "lsc_dm_email_address", "DATA_MASKING_POLICY", "EMAIL_MASK")
secureDataPolicy(params, datamask_policy_low_security_clearance_email_address)

## Low Security Clearance -> Date of Birth
policy_low_security_clearance_date_of_birth = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, "lsc_pt_date_of_birth")
print(f"policy_low_security_clearance_date_of_birth: {policy_low_security_clearance_date_of_birth}")
datamask_policy_low_security_clearance_date_of_birth = createDataPolicy(params, policy_low_security_clearance_date_of_birth, "lsc_dm_date_of_birth", "DATA_MASKING_POLICY", "DATE_YEAR_MASK")
secureDataPolicy(params, datamask_policy_low_security_clearance_date_of_birth)

## Low Security Clearance -> Bank Account Routing
policy_low_security_clearance_bank_account_routing = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, "lsc_pt_bank_account_routing")
print(f"policy_low_security_clearance_bank_account_routing: {policy_low_security_clearance_bank_account_routing}")
datamask_policy_low_security_clearance_bank_account_routing = createDataPolicy(params, policy_low_security_clearance_bank_account_routing, "lsc_dm_bank_account_routing", "DATA_MASKING_POLICY", "DEFAULT_MASKING_VALUE")
secureDataPolicy(params, datamask_policy_low_security_clearance_bank_account_routing)

## Low Security Clearance -> Bank Account Number
policy_low_security_clearance_bank_account_number = createPolicyTag(params, taxonomy_name, policy_low_security_clearance, "lsc_pt_bank_account_number")
print(f"policy_low_security_clearance_bank_account_number: {policy_low_security_clearance_bank_account_number}")
datamask_policy_low_security_clearance_bank_account_number = createDataPolicy(params, policy_low_security_clearance_bank_account_number, "lsc_dm_bank_account_number", "DATA_MASKING_POLICY", "DEFAULT_MASKING_VALUE")
secureDataPolicy(params, datamask_policy_low_security_clearance_bank_account_number)


### <font color="#4285f4">Create tables to apply security -</font> CLS / Data Masking
- driver_parquet_rls_cls_dm_high - High Security Clearance (show more data)
- driver_parquet_rls_cls_dm_low - Low Security Clearance (Hides more data)


In [None]:
# Create a Highly Privileged table

sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.driver_parquet_rls_cls_dm_high`
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
    format = "PARQUET",
    uris = ['gs://{biglake_bucket_name}/biglake-tables/driver_parquet/*.parquet'],
    max_staleness=INTERVAL 30 MINUTE,
    metadata_cache_mode="MANUAL" -- This can be setup to be 30 minutes or more
);

"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.driver_parquet_rls_cls_dm_high **")
  print()
else:
  print()
  print("Table creation failed")
  print()

# Refresh can only be done for "manual" cache mode.  This is done since this is a demo.
sql = f"CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('{project_id}.biglake_dataset.driver_parquet_rls_cls_dm_high')"

if runQuery(sql) == True:
  print()
  print(f"** Refreshed Metadata on {project_id}.biglake_dataset.driver_parquet_rls_cls_dm_high **")
  print()
else:
  print()
  print("Metadata refresh failed")
  print()

In [None]:
# Create a Low Privileged table
sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.driver_parquet_rls_cls_dm_low`
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
    format = "PARQUET",
    uris = ['gs://{biglake_bucket_name}/biglake-tables/driver_parquet/*.parquet'],
    max_staleness=INTERVAL 30 MINUTE,
    metadata_cache_mode="MANUAL" -- This can be setup to be 30 minutes or more
);

"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.driver_parquet_rls_cls_dm_low **")
  print()
else:
  print()
  print("Table creation failed")
  print()

# Refresh can only be done for "manual" cache mode.  This is done since this is a demo.
sql = f"CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('{project_id}.biglake_dataset.driver_parquet_rls_cls_dm_low')"

if runQuery(sql) == True:
  print()
  print(f"** Refreshed Metadata on {project_id}.biglake_dataset.driver_parquet_rls_cls_dm_low **")
  print()
else:
  print()
  print("Metadata refresh failed")
  print()


### <font color="#4285f4">Alter the tables schemas to add the policy tags -</font> CLS / Data Masking
- Apply the above created Policy/Data Masking tags to each field


In [None]:
driver_parquet_rls_cls_dm_high_schema_updated = [
  {
    "mode": "NULLABLE",
    "name": "driver_id",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_mobile_number",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_license_number",
    "policyTags": {
      "names": [
          policy_high_security_clearance_government_identification
      ]
    },
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_email_address",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_dob",
    "policyTags": {
      "names": [
        policy_high_security_clearance_date_of_birth
      ]
    },
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_ach_routing_number",
    "policyTags": {
      "names": [
        policy_high_security_clearance_bank_account_routing
      ]
    },
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_ach_account_number",
    "policyTags": {
      "names": [
        policy_high_security_clearance_bank_account_number
      ]
    },
    "type": "STRING"
  }
]

updateTableSchema(params["project_id"], "biglake_dataset", "driver_parquet_rls_cls_dm_high",driver_parquet_rls_cls_dm_high_schema_updated)

In [None]:
driver_parquet_rls_cls_dm_low_schema_updated = [
  {
    "mode": "NULLABLE",
    "name": "driver_id",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_mobile_number",
    "policyTags": {
      "names": [
        policy_low_security_clearance_phone_number
      ]
    },
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_license_number",
    "policyTags": {
      "names": [
          policy_low_security_clearance_government_identification
      ]
    },
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_email_address",
    "policyTags": {
      "names": [
        policy_low_security_clearance_email_address
      ]
    },
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_dob",
    "policyTags": {
      "names": [
        policy_low_security_clearance_date_of_birth
      ]
    },
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_ach_routing_number",
    "policyTags": {
      "names": [
        policy_low_security_clearance_bank_account_routing
      ]
    },
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "driver_ach_account_number",
    "policyTags": {
      "names": [
        policy_low_security_clearance_bank_account_number
      ]
    },
    "type": "STRING"
  }
]

updateTableSchema(params["project_id"], "biglake_dataset", "driver_parquet_rls_cls_dm_low",driver_parquet_rls_cls_dm_low_schema_updated)

### <font color="#4285f4">Query the tables - </font> CLS / Data Masking
- First open the tables in the BigQuery UI and see the policies applied
- The columns will be hidden or masked.
- NOTE: If you remove permissions to a Policy Tag, then you will get an error when you attempt to query the field.


<img src="https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-CLS-Data-Mask.png" width="800" height="232" valign="top" alt="BigLake Table Column / Data Masking">


In [None]:
%%bigquery

-- The High Security Clearance let's us see the fields based upon the above chart
SELECT *
  FROM `biglake_dataset.driver_parquet_rls_cls_dm_high`
 LIMIT 100;

In [None]:
%%bigquery

-- The Low Security Clearance hides additional fields (driver_license_number, email, phone number)
SELECT *
  FROM `biglake_dataset.driver_parquet_rls_cls_dm_low`
 LIMIT 100;

### <font color="#4285f4">Row Level Secuity</font>
- We can filter tables with a predicate ("WHERE clause") on our data.
- RLS works on any BigLake table of any underlying data

In [None]:
# Filter the Taxi Trips table for just pickup location = 1 and trip distance is > 10
sql = f"""
CREATE OR REPLACE ROW ACCESS POLICY taxi_trips_parquet_rap
    ON `biglake_dataset.taxi_trips_parquet`
    GRANT TO ("user:{params['user']}")
FILTER USING (PULocationID = 1 AND Trip_Distance > 10);
"""

runQuery(sql)

In [None]:
%%bigquery
SELECT PULocationID, DOLocationID, Passenger_Count, Trip_Distance, Total_Amount
  FROM `biglake_dataset.taxi_trips_parquet`
 LIMIT 10;

In [None]:
%%bigquery

-- Remove all policies
DROP ALL ROW ACCESS POLICIES ON `biglake_dataset.taxi_trips_parquet`;

## <font color='blue'>BigLake Apache Iceberg, Spark Stored Procedures and BigQuery Metastore</font>
BigLake has several ways to support Apache Iceberg
- BigLake Metadata file - read-only support of an Iceberg table.  Requires manual updates of metadata.
- BigQuery Metastore - read-only support of an Iceberg table while Spark provides read/write support. Metadata is kept up to date. https://cloud.google.com/bigquery/docs/about-bqms
- BigLake Managed Tables - fully managed experience on Apache Iceberg with support for DML and high throughput streaming

### <font color='gray'>Helper Functions - Create Spark Connection</font>
Creates the BigQuery to Dataproc Serverless connection and sets permissions.



#### createSparkConnection
Creates the Spark connection for BigQuery for Serverless Spark (Dataproc)

In [None]:
def createSparkConnection(params):
  """Creates a Spark connection in BigQuery."""

  # First find the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]
  spark_connection_name = params["spark_connection_name"]
  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections"

  # Gather existing connections
  json_result = restAPIHelper(url, "GET", None)
  print(f"createSparkConnection (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "connections" in json_result:
    for item in json_result["connections"]:
      print(f"Spark Connection: {item['name']}")
      # "projects/756740881369/locations/us/connections/spark-notebook-connection"
      # NOTE: We cannot test the complete name since it contains the project number and not id
      if item["name"].endswith(f"/locations/{bigquery_location}/connections/{spark_connection_name}"):
        print("Connection already exists")
        serviceAccountId = item["spark"]["serviceAccountId"]
        return serviceAccountId

  # Create the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create
  print("Creating Spark Connection")

  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={spark_connection_name}"

  request_body = {
      "friendlyName": spark_connection_name,
      "description": "Spark Colab Notebooks Connection for Data Analytics Golden Demo",
      "spark": {}
  }

  json_result = restAPIHelper(url, "POST", request_body)

  serviceAccountId = json_result["spark"]["serviceAccountId"]
  print("Spark Connection created: ", serviceAccountId)
  return serviceAccountId


#### setBigQueryConnectionIamPolicy
Sets the IAM Permissions on the BigQuery Connection

In [None]:
def setBigQueryConnectionIamPolicy(params, accountWithPrefix, role):
  """Sets the BigQuery connection IAM policy."""

  # Get the current bindings (if the account has access then skip)
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/getIamPolicy
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]
  biglake_connection_name = params["biglake_connection_name"]

  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections/{biglake_connection_name}:getIamPolicy"

  request_body = { }
  json_result = restAPIHelper(url, "POST", request_body)
  print(f"setBigQueryConnectionIamPolicy (GET) json_result: {json_result}")

  # Test to see if permissions exist
  if "bindings" in json_result:
    for item in json_result["bindings"]:
      members = item["members"]
      for member in members:
        if member == accountWithPrefix:
          print("Permissions exist")
          return

  # Take the existing bindings and we need to append the new permission
  # Otherwise we loose the existing permissions
  if "bindings" in json_result:
    bindings = json_result["bindings"]
  else:
    bindings = []

  new_permission = {
      "role": role,
      "members": [ accountWithPrefix ]
      }

  bindings.append(new_permission)

  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/setIamPolicy
  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections/{biglake_connection_name}:setIamPolicy"

  request_body = { "policy" : {
      "bindings" : bindings
      }
  }

  print(f"Permission bindings: {bindings}")

  json_result = restAPIHelper(url, "POST", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print(f"BigQuery Connection IAM Permissions set for {accountWithPrefix} {role}")

#### setProjectLevelIamPolicy
Sets the IAM Permissions at the Project Level

In [None]:
def setProjectLevelIamPolicy(params, accountWithPrefix, role):
  """Sets the Project Level IAM policy."""

  # Get the current bindings (if the account has access then skip)
  # https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy
  project_id = params["project_id"]

  url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}:getIamPolicy"

  request_body = { }
  json_result = restAPIHelper(url, "POST", request_body)
  print(f"setProjectLevelIamPolicy (GET) json_result: {json_result}")

  # Test to see if permissions exist
  if "bindings" in json_result:
    for item in json_result["bindings"]:
      if item["role"] == role:
        members = item["members"]
        for member in members:
          if member == accountWithPrefix:
            print("Permissions exist")
            return

  # Take the existing bindings and we need to append the new permission
  # Otherwise we loose the existing permissions
  if "bindings" in json_result:
    bindings = json_result["bindings"]
  else:
    bindings = []

  new_permission = {
      "role": role,
      "members": [ accountWithPrefix ]
      }

  bindings.append(new_permission)

  # https://cloud.google.com/resource-manager/reference/rest/v1/projects/setIamPolicy
  url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}:setIamPolicy"

  request_body = { "policy" : {
      "bindings" : bindings
      }
  }

  print(f"Permission bindings: {bindings}")

  json_result = restAPIHelper(url, "POST", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print(f"Project Level IAM Permissions set for {accountWithPrefix} {role}")

### <font color='gray'>Initailize Permissions -</font> Spark Stored Procedure using BigLake and BigQuery Metastore
Creates following:
- Create the external connection ```spark-notebook-connection```
- Grants the IAM permissions on biglake and spark connections
- See this video: [YouTube](https://youtu.be/IQR9gJuLXbQ)

#### Security Setup Images

<img src="https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/bigquery-metastore-spark-connection.png" width="800" valign="top" alt="BigQuery Metastore Spark">

<img src="https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/bigquery-metastore-biglake-connecton.png" width="800" valign="top" alt="BigQuery Metastore Biglake">

#### Security Setup Code

In [None]:
# Create the Spark connection (if not exists)
sparkServiceAccountId = createSparkConnection(params)
print(f"createSparkConnection: {sparkServiceAccountId}")
params["sparkServiceAccountId"] = sparkServiceAccountId

bigLakeServiceAccountId = params["bigLakeServiceAccountId"]

# Grant the Spark connection service principal access to the Cloud Storage account
# This account needs to read/write data from this account as part of the Spark processing (when it creates/read Iceberg tables)
setBucketIamPolicy(params, f"serviceAccount:{sparkServiceAccountId}", "roles/storage.objectAdmin")

# We need to grant bigquery.connections.delegate permission (or (roles/bigquery.connectionAdmin)) to the Spark Service Account ON THE BigLake connection
# You should create a custom role with bigquery.connections.delegate permission for this (this is the only permission required)
setBigQueryConnectionIamPolicy(params, f"serviceAccount:{sparkServiceAccountId}", "roles/bigquery.connectionAdmin")

# In IAM add roles/biglake.admin to the us.biglake-notebook-connection service account
# To create the tables in BigQuery linked to BigQuery Metastore
setProjectLevelIamPolicy(params, f"serviceAccount:{bigLakeServiceAccountId}", "roles/biglake.admin")

# In IAM add roles/bigquery.user to the us.spark-notebook-connection service account
# To create BigQuery jobs
setProjectLevelIamPolicy(params, f"serviceAccount:{sparkServiceAccountId}", "roles/bigquery.user")

# Set roles/bigquery.dataOwner (OWNER) to both service principals (biglake and spark connections)
setBigQueryDatasetPolicy(params, "biglake_dataset", f"{bigLakeServiceAccountId}", "OWNER")
setBigQueryDatasetPolicy(params, "biglake_dataset", f"{sparkServiceAccountId}", "OWNER")

### <font color='blue'>Spark Stored Procedure -</font> Create Apache Iceberg Table using BigLake Metastore

#### <font color='blue'>Spark Script</font>
The BigQuery Spark Stored Procedure will reference a script in GCS

In [None]:
print(f"To view the PySpark Scripts: https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/pyspark?project={project_id}")

#### <font color='blue'>Create BigQuery Spark Stored Procedure</font>
- PySpark source code that creates the Iceberg table: [GitHub](https://github.com/GoogleCloudPlatform/data-analytics-golden-demo/blob/main/dataproc/pyspark_apache_iceberg_bqms.py)
- Now that the connection is configured, the Spark stored procedure can be created that references the script.
- Note all the Spark Properities required for using the BigLake Metastore with Iceberg.

In [None]:
# Create the Spark Stored Procedure
project_id = params["project_id"]
bigquery_location = params["bigquery_location"]
biglake_bucket_name = params["biglake_bucket_name"]

sql = f"""CREATE OR REPLACE PROCEDURE`{project_id}.biglake_dataset.sp_iceberg_driver_iceberg`(
	iceberg_catalog STRING,
	iceberg_warehouse STRING,
	iceberg_table STRING,
	bq_dataset STRING,
	bq_region STRING,
	biglake_connection STRING,
	source_parquet_file STRING,
	project_id STRING
)
WITH CONNECTION `{project_id}.{bigquery_location}.spark-notebook-connection`
OPTIONS (
  main_file_uri="gs://{biglake_bucket_name}/pyspark/pyspark_apache_iceberg_bqms.py",
	engine='SPARK',
	runtime_version='2.2',
	jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar"],
	properties=[
		("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2"),
		("spark.sql.catalog.iceberg_catalog","org.apache.iceberg.spark.SparkCatalog"),
		("spark.sql.catalog.iceberg_catalog.catalog-impl","org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
		("spark.sql.catalog.iceberg_catalog.gcp_project","{project_id}"),
		("spark.sql.catalog.iceberg_catalog.gcp_location","{bigquery_location}"),
		("spark.sql.catalog.iceberg_catalog.warehouse","gs://{biglake_bucket_name}/biglake-tables")
		]
	)
LANGUAGE python;"""

runQuery(sql)

#### <font color='blue'>Execute the Spark Stored Procedure</font>
- This will take 2 to 4 minutes.
- This will:
  - Initialize the Iceberg catalog
  - Initialize the Iceberg warehouse
  - Create the "driver_iceberg" table
  - Open the parquet file and then insert the data into the iceberg table
- When done:
  - You will see the files on storage
  - Open [bucket](https://console.cloud.google.com/storage/browser)
  - Under biglake-tables you will see: iceberg_warehouse.db
  - A table has been created in the biglake_dataset
  - Open the table, click on Details and you will see the BigQuery Metatstore connection

In [None]:
project_id = params["project_id"]
biglake_bucket_name = params["biglake_bucket_name"]
bigquery_location = params["bigquery_location"]

print("Run this in  BigQuery")
print("")

sql = f"""CALL `{project_id}.biglake_dataset.sp_iceberg_driver_iceberg`(
  "iceberg_catalog",
  "iceberg_warehouse",
  "driver_iceberg",
  "biglake_dataset",
  "{bigquery_location}",
  "biglake-notebook-connection",
  "gs://{biglake_bucket_name}/biglake-tables/driver_parquet/*.snappy.parquet",
  "{project_id}"
);"""

print(f"sql: {sql}")
print()
print("When the job is done, click on Job Information and Log to see the Spark log.")

runQuery(sql)

In [None]:
print(f"To view the Iceberg files: https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/biglake-tables/iceberg_warehouse.db?project={project_id}")

#### <font color='blue'>BigLake Query Iceberg</font>
Query the Apache Iceberg table that is connected to the BigLake Metastore and create a table using the manifest apporach.

In [None]:
%%bigquery

-- This table will see any writes performed by Spark
SELECT *
  FROM `iceberg_warehouse.driver_iceberg`
ORDER BY driver_id
LIMIT 10;

In [None]:
#*****************************************************************************
# MANUAL STEP
# You may also create an Iceberg table by pointing at the BigLake Metadata file
# You need to open GCS and hard code the following URI
# Select the JSON file with the LASTEST date/time stamp
#*****************************************************************************

sql = f"""
CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.driver_iceberg_metadata_file`
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
  format = "ICEBERG",
  uris = ["gs://biglake-data-analytics-demo-tpo1envlg2/biglake-tables/iceberg_warehouse.db/driver_iceberg/metadata/00003-eebd23f8-1a8b-432d-9aa9-cf1511cb9658.metadata.json"]

);
"""

runQuery(sql)


In [None]:
%%bigquery

-- This table will only see at a specific point in time, based upon the metadata file
SELECT *
  FROM `biglake_dataset.driver_iceberg_metadata_file`
ORDER BY driver_id
LIMIT 10;

## <font color='blue'>BigLake Materialized Views -</font> Materialized views over BigLake Metadata Cache Enabled Tables
- Create materialized views over BigLake tables
- https://cloud.google.com/bigquery/docs/materialized-views-intro#biglake

In [None]:
%%bigquery

-- Create the materialized view over metadata cache-enabled tables
-- Sum the taxi trips information by location
-- Here we are joining a parquet table to a hudi table

CREATE OR REPLACE MATERIALIZED VIEW `biglake_dataset.taxi_trips_materialized_view`
OPTIONS (enable_refresh = true, refresh_interval_minutes = 30,
         max_staleness=INTERVAL "0:30:0" HOUR TO SECOND,
         description='Taxi Trips by Date and Pickup Location')
AS
SELECT CAST(Pickup_DateTime AS DATE) AS pickup_date,
       location_pickup.borough AS pickup_borough,
       location_pickup.zone AS pickup_zone,
       SUM(taxi_trip.Fare_Amount) AS total_fare_amount,
       SUM(taxi_trip.Surcharge) AS total_surcharge,
       SUM(taxi_trip.Tip_Amount) AS total_tip_amount,
       SUM(taxi_trip.Total_Amount) AS total_total_amount

 FROM `biglake_dataset.taxi_trips_parquet` AS taxi_trip
      INNER JOIN `biglake_dataset.location_hudi` AS location_pickup
              ON taxi_trip.PULocationID = location_pickup.location_id
GROUP BY ALL;

In [None]:
%%bigquery

-- Query the materialized view
SELECT *
  FROM `biglake_dataset.taxi_trips_materialized_view`
 ORDER BY pickup_date DESC, pickup_borough, pickup_zone
 LIMIT 25;

## <font color='blue'>BigLake Object Tables -</font> Unstructured Data Analytics with VertexAI

### <font color='gray'>Helper Functions - </font>Create Connections</font>


#### createVertexAIConnection
Creates the BigQuery external connection to allow calls to Vertex AI directly from BigQuery.

In [None]:
def createVertexAIConnection(params):
  """Creates a Vertex AI connection."""

  # First find the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]
  vertex_ai_connection_name = params["vertex_ai_connection_name"]
  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections"

  # Gather existing connections
  json_result = restAPIHelper(url, "GET", None)
  print(f"createVertexAIConnection (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "connections" in json_result:
    for item in json_result["connections"]:
      print(f"BigLake Connection: {item['name']}")
      # "projects/756740881369/locations/us/connections/vertex-ai-notebook-connection"
      # NOTE: We cannot test the complete name since it contains the project number and not id
      if item["name"].endswith(f"/locations/{bigquery_location}/connections/{vertex_ai_connection_name}"):
        print("Connection already exists")
        serviceAccountId = item["cloudResource"]["serviceAccountId"]
        return serviceAccountId

  # Create the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create
  print("Creating Vertex AI Connection")

  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={vertex_ai_connection_name}"

  request_body = {
      "friendlyName": biglake_connection_name,
      "description": "Vertex AI Colab Notebooks Connection for Data Analytics Golden Demo",
      "cloudResource": {}
  }

  json_result = restAPIHelper(url, "POST", request_body)

  serviceAccountId = json_result["cloudResource"]["serviceAccountId"]
  print("Vertex AI Connection created: ", serviceAccountId)
  return serviceAccountId

### <font color='blue'>BigLake Object Tables - </font>Create Object Tables


In [None]:
# Copy data
# Create vision, document and audio tables
# Show security

#### <font color='blue'>BigLake Object Table - </font> Images
Create an object table that is pointed at location which contains images.

In [None]:
project_id = params["project_id"]
bigquery_location = params["bigquery_location"]
biglake_connection_name = params["biglake_connection_name"]
biglake_bucket_name = params["biglake_bucket_name"]

sql = f"""

CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.object_table_taxi_image`
WITH CONNECTION `{project_id}.{bigquery_location}.{biglake_connection_name}`
OPTIONS (
    object_metadata="DIRECTORY",
    uris = ['gs://{biglake_bucket_name}/biglake-object-tables/images/*.png'],
    max_staleness=INTERVAL 30 MINUTE,
    metadata_cache_mode="MANUAL"
    );

"""

if runQuery(sql) == True:
  print()
  print(f"** Created table {project_id}.biglake_dataset.taxi_trips_parquet **")
  print()
else:
  print()
  print("Table creation failed")
  print()

# Refresh can only be done for "manual" cache mode.  This is done since this is a demo.
sql = f"CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('{project_id}.biglake_dataset.object_table_taxi_image')"

if runQuery(sql) == True:
  print()
  print(f"** Refreshed Metadata on {project_id}.biglake_dataset.object_table_taxi_image **")
  print()
else:
  print()
  print("Metadata refresh failed")
  print()


In [None]:
%%bigquery

-- NOTE: Each image contains a metadata table for borough.
-- Row Level Security can be used to filter this data by borough or it can be used in queries.

SELECT *
  FROM `biglake_dataset.object_table_taxi_image`
LIMIT 5;

### <font color='blue'>BigLake Object Tables - </font>Vision Analysis using Vertex AI (Gemini Pro, Text/Vector Embeddings, Vector Search)
Use machine learning to determine the contents of each image.  Pass the results to Gemini Pro to get a description of the image.  Then create text embeddings on the LLM result test. Finally perform a Vector Search (Semantic match) to search the images for objects.

<img src="https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-Object-Table-Vector-Embeddings-Diagram.png" width="800" height="380" valign="top" alt="BigLake Object Tables with Vertex AI and Vector Embeddings">    

#### <font color='blue'>BigQuery Connection and Models - </font>Setup
Create the connections to Vertex AI and Gemini Pro

In [None]:
# Create the BigQuery External Connection that will be used to call the Vertex AI
# Set the required permissions on the external connection's service principal
vertexAIServiceAccountId = createVertexAIConnection(params)

params["vertexAIServiceAccountId"] = vertexAIServiceAccountId
bigLakeServiceAccountId = params["bigLakeServiceAccountId"]

# To call Vision API
setProjectLevelIamPolicy(params, f"serviceAccount:{vertexAIServiceAccountId}", "roles/serviceusage.serviceUsageConsumer")
setProjectLevelIamPolicy(params, f"serviceAccount:{bigLakeServiceAccountId}", "roles/serviceusage.serviceUsageConsumer")

# To call GENERATE TEXT
setProjectLevelIamPolicy(params, f"serviceAccount:{vertexAIServiceAccountId}","roles/aiplatform.user")

In [None]:
# Create the remote connection to each Vertex AI service. (Vision, Gemini Pro, Embeddings, etc.)
# The models use the external connection

project_id = params["project_id"]
bigquery_location = params["bigquery_location"]
vertex_ai_connection_name = params["vertex_ai_connection_name"]

sql = f"""CREATE MODEL IF NOT EXISTS `{project_id}.biglake_dataset.vision-connection`
REMOTE WITH CONNECTION `{project_id}.{bigquery_location}.{vertex_ai_connection_name}`
OPTIONS (remote_service_type = 'cloud_ai_vision_v1');"""

runQuery(sql)

print(f"Created cloud_ai_vision_v1: {sql}")

sql = f"""CREATE MODEL IF NOT EXISTS `{project_id}.biglake_dataset.gemini_model`
REMOTE WITH CONNECTION `{project_id}.{bigquery_location}.{vertex_ai_connection_name}`
OPTIONS (endpoint = 'gemini-2.0-flash');"""

runQuery(sql)

print(f"Created gemini_model: {sql}")

sql = f"""CREATE MODEL IF NOT EXISTS `{project_id}.biglake_dataset.textembedding_model`
REMOTE WITH CONNECTION `{project_id}.{bigquery_location}.{vertex_ai_connection_name}`
OPTIONS (endpoint = 'text-embedding-005');"""

runQuery(sql)

print(f"Created text-embedding-005: {sql}")

In [None]:
%%bigquery

# Helper UDFs to use with Gemini Pro

-- LLM helper methods to scrub common character issues
CREATE OR REPLACE FUNCTION `biglake_dataset.parse_llm_to_text`(input JSON) RETURNS STRING AS (
REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(JSON_VALUE(input.candidates[0].content.parts[0].text),'\n',' '),'\"','"'),'``` JSON',''),'```json',''),'```','')
);

CREATE OR REPLACE FUNCTION `biglake_dataset.parse_llm_to_json`(input JSON) RETURNS JSON AS (
SAFE.PARSE_JSON(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(JSON_VALUE(input.candidates[0].content.parts[0].text),'\n',' '),'\"','"'),'``` JSON',''),'```json',''),'```','')   )
);

#### <font color='blue'>Use Vertex Image Detection on BigLake Object Table</font> - **Gemini Pro Summarization and Vector Search**
The following will:
1. Create a new table that holds the results from the Vision AI call
2. Call Gemini Pro which will take the Vision AI JSON result and ask for a readable description to be created.
3. Text Embeddings will then be created over each LLM description
4. Semantic Search will then be performed.

In [None]:
%%bigquery

-- Call the Vision API to determine Label and Object detection in each image

CREATE OR REPLACE TABLE `biglake_dataset.object_table_taxi_image_inference` AS
SELECT uri,
       ml_annotate_image_result,
       metadata,
       CAST(NULL AS STRING) AS llm_result,
       CAST(NULL AS ARRAY<FLOAT64>) AS vector_embedding
FROM ML.ANNOTATE_IMAGE(
  MODEL `biglake_dataset.vision-connection`,
  TABLE `biglake_dataset.object_table_taxi_image`,
  STRUCT(['LABEL_DETECTION', 'OBJECT_LOCALIZATION'] AS vision_features)
);

# The Vision API also supports:
# https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-annotate-image
# FACE_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, LABEL_DETECTION
# TEXT_DETECTION, DOCUMENT_TEXT_DETECTION, IMAGE_PROPERTIES, OBJECT_LOCALIZATION

In [None]:
%%bigquery

# View the results which are stored as JSON

SELECT uri, ml_annotate_image_result
  FROM `biglake_dataset.object_table_taxi_image_inference`
ORDER BY uri
LIMIT 2;

<font color='red'>Error Warning</font><br/>
If you receive the error message ```bqcx-xxxxxxxxxxxx-xxxx@gcp-sa-bigquery-condel.iam.gserviceaccount.com does not have the permission to access resources used by ML.GENERATE_TEXT_EMBEDDING```.  Please wait a few minutes for the security permissions to proprogate and try again.

In [None]:
# prompt: python code to sleep for 2 minutes

import time
time.sleep(120)

In [None]:
%%bigquery

-- Take the Vertex AI JSON results and pass them to Gemini Pro
-- Create a readable description based upon the JSON

UPDATE `biglake_dataset.object_table_taxi_image_inference` AS object_table_taxi_image_inference
   SET llm_result = `biglake_dataset.parse_llm_to_text`(ml_generate_text_result)
  FROM (SELECT *
          FROM ML.GENERATE_TEXT(MODEL `biglake_dataset.gemini_model`,
              (SELECT uri,
                      CONCAT("Generate a description from the below JSON.\n",
                             "Make sure you include all the objects listed in the JSON.\n",
                             "The data will be used for lost objects in a taxi cab.\n",
                             "You do NOT need to include information about the taxi cab itself like seats, cab color, windows, etc..\n",
                             "The JSON is the output of image recognition.\n",
                             "JSON:\n",
                             TO_JSON_STRING(ml_annotate_image_result)) AS prompt
                 FROM `biglake_dataset.object_table_taxi_image_inference`
                WHERE llm_result IS NULL
                ),
               STRUCT(
               .9 AS temperature,
               5000 AS max_output_tokens,
               .8 AS top_p,
               30 AS top_k)
               )
      ) AS llm_query
WHERE object_table_taxi_image_inference.uri = llm_query.uri;

In [None]:
%%bigquery

-- See the LLM Results
-- Search for an item that was lost in the taxi
-- This uses the LIKE keyword to do a search which requires us to be specific in our search.

SELECT uri, llm_result
  FROM `biglake_dataset.object_table_taxi_image_inference`
 WHERE llm_result LIKE '%travel bag%'
       OR llm_result LIKE '%backpack%'
ORDER BY uri;

In [None]:
%%bigquery

-- Now create Text Embedded on the LLM Description
-- This will let us do a Vector Search

SELECT uri, content, text_embedding
  FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,
       (SELECT uri, llm_result AS content
          FROM `biglake_dataset.object_table_taxi_image_inference`),
       STRUCT(TRUE AS flatten_json_output)
      )
LIMIT 5;

In [None]:
%%bigquery

-- Update the text_embeddings column in the object_table_taxi_image_inference table

UPDATE `biglake_dataset.object_table_taxi_image_inference` AS object_table_taxi_image_inference
   SET vector_embedding = text_embedding
  FROM (SELECT *
          FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,
              (SELECT uri, llm_result AS content
                 FROM `biglake_dataset.object_table_taxi_image_inference`),
                STRUCT(TRUE AS flatten_json_output)
      )) AS llm_embedding
WHERE object_table_taxi_image_inference.uri = llm_embedding.uri;

In [None]:
#%%bigquery

# BigQuery Support Vector Indexes on embeddings.  In our case we do not have enough data to demo.
# You need at least 5000 rows to create a VECTOR INDEX

# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#create_vector_index_statement

#CREATE VECTOR INDEX object_table_taxi_image_vector_index ON `biglake_dataset.object_table_taxi_image_inference`(vector_embedding)
#OPTIONS (index_type = 'IVF', distance_type = 'EUCLIDEAN');

In [None]:
%%bigquery

# Search now using semanitic match
# Search for Backpack.  Previously we searched for:
#   llm_result LIKE '%travel bag%'OR llm_result LIKE '%backpack%'.

# The LIKE search returned 1 row and now we get back additional matches

# https://cloud.google.com/bigquery/docs/reference/standard-sql/search_functions#vector_search

SELECT base.uri, base.llm_result, distance
  FROM VECTOR_SEARCH(TABLE `biglake_dataset.object_table_taxi_image_inference`,
                    'vector_embedding',
                    (SELECT *
                       FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,
                                                      (SELECT 'backpack' AS content ),
                                                       STRUCT(TRUE AS flatten_json_output))),
                   'text_embedding',
                    top_k => 2,
                    distance_type => 'EUCLIDEAN' -- or COSINE
                    );

In [None]:
################################################################################
# Search for lost backpacks
################################################################################
from PIL import Image
import IPython.display

sql = """SELECT base.uri AS uri, base.metadata[0].value as borough, distance
           FROM VECTOR_SEARCH(TABLE `biglake_dataset.object_table_taxi_image_inference`,
                    'vector_embedding',
                    (SELECT *
                       FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,
                                                      (SELECT 'backpack' AS content ),
                                                       STRUCT(TRUE AS flatten_json_output))),
                   'text_embedding',
                    top_k => 2,
                    distance_type => 'EUCLIDEAN' -- or COSINE
                    )
            ORDER BY distance;"""

image_df = runQuery(sql)

for index, row in image_df.iterrows():
  uri = row['uri']
  borough = row['borough']
  downloaded_filename = downloadGCSFile(uri)
  print(f"uri: {uri}")
  print(f"borough: {borough}")
  img = Image.open(downloaded_filename)
  img.thumbnail([400,711])
  IPython.display.display(img)
  print()

In [None]:
################################################################################
# Search for lost "cell" phones in the Bronx
# You can also search for "hat"
# You can also comment out -- WHERE base.metadata[0].value = 'Bronx'
################################################################################

from PIL import Image
import IPython.display

sql = """SELECT base.uri, base.metadata[0].value as borough, distance
           FROM VECTOR_SEARCH(TABLE `biglake_dataset.object_table_taxi_image_inference`,
                    'vector_embedding',
                    (SELECT *
                       FROM ML.GENERATE_TEXT_EMBEDDING(MODEL `biglake_dataset.textembedding_model`,
                                                      (SELECT 'cell' AS content ),
                                                       STRUCT(TRUE AS flatten_json_output))),
                   'text_embedding',
                    top_k => 2,
                    distance_type => 'EUCLIDEAN' -- or COSINE
                    )
          WHERE base.metadata[0].value = 'Bronx'
       ORDER BY distance"""

image_df = runQuery(sql)

for index, row in image_df.iterrows():
  uri = row['uri']
  borough = row['borough']
  downloaded_filename = downloadGCSFile(uri)
  print(f"uri: {uri}")
  print(f"borough: {borough}")
  img = Image.open(downloaded_filename)
  img.thumbnail([400,711])
  IPython.display.display(img)
  print()