### <font color='blue'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

Author: Adam Paternostro

### Iceberg / BigQuery Sync Demo (Readme)
Shows how to create an Iceberg table on GCS, create a BigLake External table and updating the BigLake External table's metadata.  This can be used for keeping BigQuery in sync with external Iceberg tables that are being written to by third parties.

In [None]:
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-Iceberg-Sync-Demo-Preferred.png', width=1200)

In [None]:
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-Iceberg-Sync-Demo-Secondary.png', width=1200)

### PIP Install (Run once)

In [None]:
!pip install --upgrade pip

In [None]:
!pip install "pyiceberg[gcsfs,hive]"

### Initialize Code

In [None]:
import google.auth
import requests
import json
import datetime
import os
import re
import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog
from google.cloud import storage
from datetime import datetime


# Parameters / Variables
# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

bigquery_location = "us"
iceberg_catalog_namespace = "default"
biglake_bucket_name = "biglake-" + project_id
biglake_connection_name = "biglake-notebook-connection"
warehouse_path = "/tmp/sqlite"

# Make the SQLite directory (if not exists).  NOTE: This should be a HIVE metastore or something more permanent
if not os.path.exists(warehouse_path):
  os.makedirs(warehouse_path)

params = { "project_id" : project_id,
           "bigquery_location" : bigquery_location,
           "biglake_connection_name": biglake_connection_name,
           "biglake_bucket_name" : biglake_bucket_name,
           "user" : user
           }

### Helper Functions

In [None]:
def getGoogleCredentials():
  """Gets the current users credentials.

  Args:
    None.
  """
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  return creds

In [None]:
def datetimeToUnixMs(dt):
  """Converts a datetime object to a unix timestamp in milliseconds.

  Args:
    dt: A datetime object.
  """
  return int(dt.timestamp() * 1000)

#### deleteGCSFoldereletes all the files in a GCS folder

In [None]:
def deleteGCSFolder(bucket_name, folder_name):
  """Deletes all files in a GCS folder.

  Args:
    bucket_name: The name of the GCS bucket.
    folder_name: The name of the folder to delete.
  """

  storage_client = storage.Client()
  bucket = storage_client.bucket(bucket_name)
  blobs = bucket.list_blobs(prefix=folder_name)

  for blob in blobs:
    blob.delete()

#### RunQuery
Runs a SQL statement on BigQuery

In [None]:
def RunQuery(sql):
  """Runs a SQL statement on BigQuery and shows the status of the BigQuery job.

  Args:
    sql: The SQL statement to run.
  """
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      return False

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials

  Args:
    url: The URL to call.
    http_verb: The HTTP verb to use.
    request_body: The request body to send.
  """
  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### createBigLakeConnection
Creates the BigQuery external connection and returns the generated service principal.  The service principal then needs to be granted IAM access to resourses it requires.

In [None]:
def createBigLakeConnection(params):
  """Creates a BigLake connection to be used for BigLake tables.

  Args:
    params: The parameters to use.
  """

  # First find the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list
  project_id = params["project_id"]
  bigquery_location = params["bigquery_location"]
  biglake_connection_name = params["biglake_connection_name"]
  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections"

  # Gather existing connections
  json_result = restAPIHelper(url, "GET", None)
  print(f"createBigLakeConnection (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "connections" in json_result:
    for item in json_result["connections"]:
      print(f"BigLake Connection: {item['name']}")
      # "projects/756740881369/locations/us/connections/biglake-notebook-connection"
      # NOTE: We cannot test the complete name since it contains the project number and not id
      if item["name"].endswith(f"/locations/{bigquery_location}/connections/{biglake_connection_name}"):
        print("Connection already exists")
        serviceAccountId = item["cloudResource"]["serviceAccountId"]
        return serviceAccountId

  # Create the connection
  # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create
  print("Creating BigLake Connection")

  url = f"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={biglake_connection_name}"

  request_body = {
      "friendlyName": biglake_connection_name,
      "description": "BigLake Colab Notebooks Connection for Data Analytics Golden Demo",
      "cloudResource": {}
  }

  json_result = restAPIHelper(url, "POST", request_body)

  serviceAccountId = json_result["cloudResource"]["serviceAccountId"]
  print("BigLake Connection created: ", serviceAccountId)
  return serviceAccountId


#### createGoogleCloudStorageBucket
Create the Google Cloud Storage bucket that will be used for holding the BigLake files (avro, csv, delta, hudi, json, parquet)

In [None]:
def createGoogleCloudStorageBucket(params):
  """Creates a Google Cloud Storage bucket.

  Args:
    params: The parameters to use.
  """

  # First find the bucket
  # https://cloud.google.com/storage/docs/json_api/v1/buckets/list
  project_id = params["project_id"]
  biglake_bucket_name = params["biglake_bucket_name"]
  url = f"https://storage.googleapis.com/storage/v1/b?project={project_id}"

  # Gather existing buckets
  json_result = restAPIHelper(url, "GET", None)
  print(f"createGoogleCloudStorageBucket (GET) json_result: {json_result}")

  # Test to see if connection exists, if so return
  if "items" in json_result:
    for item in json_result["items"]:
      print(f"Bucket Id / Name: ({item['id']} / {item['name']}")
      if item["id"] == biglake_bucket_name:
        print("Bucket already exists")
        return

  # Create the bucket
  # https://cloud.google.com/storage/docs/json_api/v1/buckets/insert
  print("Creating Google Cloud Bucket")

  url = f"https://storage.googleapis.com/storage/v1/b?project={project_id}&predefinedAcl=private&predefinedDefaultObjectAcl=private&projection=noAcl"

  request_body = {
      "name": biglake_bucket_name
  }

  json_result = restAPIHelper(url, "POST", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print("BigLake Bucket created: ", biglake_bucket_name)

#### setBucketIamPolicy
Added the BigLake External Connection Service Principal to the IAM permission of the GCS bucket.

In [None]:
def setBucketIamPolicy(params, accountWithPrefix, role):
  """Sets the bucket IAM policy.  We need to add the BigLake Service Account to the IAM policy.

  Args:
    params: The parameters to use.
    accountWithPrefix: The account to add to the IAM policy. (the prefix is user: or serviceAccount: or group:)
    role: The role to add to the IAM policy.
  """
  biglake_bucket_name = params["biglake_bucket_name"]

  # Get the current bindings (if the account has access then skip)
  # https://cloud.google.com/storage/docs/json_api/v1/buckets/getIamPolicy

  url = f"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/iam"
  json_result = restAPIHelper(url, "GET", None)
  print(f"setBucketIamPolicy (GET) json_result: {json_result}")

  # Test to see if permissions exist
  if "bindings" in json_result:
    for item in json_result["bindings"]:
      members = item["members"]
      for member in members:
        if member == accountWithPrefix:
          print("Permissions exist")
          return

  # Take the existing bindings and we need to append the new permission
  # Otherwise we loose the existing permissions

  bindings = json_result["bindings"]
  new_permission = {
      "role": role,
      "members": [ accountWithPrefix ]
      }

  bindings.append(new_permission)

  # https://cloud.google.com/storage/docs/json_api/v1/buckets/setIamPolicy
  url = f"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/iam"

  request_body = { "bindings" : bindings }

  print(f"Permission bindings: {bindings}")


  json_result = restAPIHelper(url, "PUT", request_body)
  print()
  print(f"json_result: {json_result}")
  print()
  print(f"Bucket IAM Permissions set for {accountWithPrefix} {role}")

#### getProjectNumber
Gets the project number from a project id

In [None]:
def getProjectNumber(params):
  """Gets the project number from a project id.

  Args:
    params: The parameters to use.
  """

  if "project_number" not in params:
    # https://cloud.google.com/resource-manager/reference/rest/v1/projects/get?
    project_id = params["project_id"]

    url = f"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}"
    json_result = restAPIHelper(url, "GET", None)
    print(f"setBucketIamPolicy (GET) json_result: {json_result}")

    project_number = json_result["projectNumber"]
    params["project_number"] = project_number
    print(f"getProjectNumber: {project_number}")

  else:
    project_number = params["project_number"]
    print(f"getProjectNumber: {project_number}")
    return project_number

#### activateServiceAPIs
Enables Google Cloud APIs

In [None]:
def activateServiceAPIs(params):
  """Batch activates service apis

  Args:
    params: The parameters to use.
  """

  project_number = params["project_number"]

  request_body = {
      "serviceIds" : [ "biglake.googleapis.com" ]
  }

  url = f"https://serviceusage.googleapis.com/v1/projects/{project_number}/services:batchEnable"
  json_result = restAPIHelper(url, "POST", request_body)
  print(f"activateServiceAPIs (POST) json_result: {json_result}")

#### initialize
Calls the methods to create the external connection, create the GCS bucket, apply IAM permissions and copies the public data.  This method can be re-run as required and does not cause duplication issues.  Each method tests for the existance of items before creating.

In [None]:
def initialize(params):
  """Create the BigLake connection, GCS bucket, set IAM permissions and copies data.

  Args:
    params: The parameters to use.
  """
  # Create the BigLake connection (if not exists)
  bigLakeServiceAccountId = createBigLakeConnection(params)
  print(f"createBigLakeConnection: {bigLakeServiceAccountId}")
  params["bigLakeServiceAccountId"] = bigLakeServiceAccountId

  # Create storage account (if not exists)
  createGoogleCloudStorageBucket(params)

  # Grant access to GCS Bucket for BigLake Connection (if not exists)
  setBucketIamPolicy(params, f"serviceAccount:{bigLakeServiceAccountId}", "roles/storage.objectAdmin")
  setBucketIamPolicy(params, f"user:{user}", "roles/storage.admin")

  getProjectNumber(params)
  activateServiceAPIs(params)

### Initialize the Demo

In [None]:
initialize(params)

In [None]:
%%bigquery --params $params

CREATE SCHEMA IF NOT EXISTS biglake_dataset OPTIONS(location = @bigquery_location);

### Create the Iceberg Catalog and have it save the data to GCS

In [None]:
# We will use SQLite for our catalog database, this is where we store our information as to where our tables are located
# You would typically use Hive or an Iceberg catalog for this.
# We also want our files to be stored in GCS
# NOTE: This is not refreshing our "gcs.oauth2.token" which expires (typically every 59 minutes)

creds = getGoogleCredentials()

catalog = SqlCatalog(
    iceberg_catalog_namespace,
    **{
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "gcs.oauth2.token-expires-at": datetimeToUnixMs(creds.expiry),
        "gcs.project-id": project_id,
        "gcs.oauth2.token": creds.token,
        "gcs.default-bucket-location": f"gs://{biglake_bucket_name}/",
        "warehouse": f"gs://{biglake_bucket_name}/"
    },
)

### Create the default Iceberg namespace

In [None]:
# Create the Namespace if it does not exist

if (iceberg_catalog_namespace,) in catalog.list_namespaces():
  print(f"Catalog iceberg_catalog_namespace already exists")
else:
  catalog.create_namespace(iceberg_catalog_namespace)

### Create a Customer Table, BigLake External Iceberg Table

1. Create the customer iceberg table
2. Create a BigLake External table pointing to the GCS metadata file (specific version)
3. Append records to the customer iceberg table
4. PATCH or update the BigLake External table setting the metadata file to the latest version


In [None]:
# Create a customer table with 4 rows
# https://py.iceberg.apache.org/api/#write-support

# Define the schema for the customer table
schema = pa.schema([
    ('customer_id', pa.int64()),
    ('customer_name', pa.string())
])

if (iceberg_catalog_namespace, 'customer') in catalog.list_tables(iceberg_catalog_namespace):
  # Drop the table
  catalog.drop_table(f"{iceberg_catalog_namespace}.customer")
  print("Table Dropped")

# We need to delete the GCS bucket folder (dropping an iceberg table does not delete the files on GCS)
deleteGCSFolder(biglake_bucket_name,f"{iceberg_catalog_namespace}.db/customer")
print(f"All files in folder {iceberg_catalog_namespace}.db/customer/metadata have been deleted.")

table = catalog.create_table(f"{iceberg_catalog_namespace}.customer", schema=schema)

df = pa.Table.from_pylist(
    [
        {"customer_id": 1, "customer_name": "Customer 001"},
        {"customer_id": 2, "customer_name": "Customer 002"},
        {"customer_id": 3, "customer_name": "Customer 003"},
        {"customer_id": 4, "customer_name": "Customer 004"},
    ],
)

table.append(df)

In [None]:
def determineLatestIcebergMetadataFile(gcs_path):
  """
  Finds the file ending with 'metadata.json' with the most recent last-sequence-number and last-updated-ms.
    https://iceberg.apache.org/spec/#table-metadata-fields
    last-sequence-number: The table's highest assigned sequence number, a monotonically increasing long that tracks the order of snapshots in a table.
    last-updated-ms: Timestamp in milliseconds from the unix epoch when the table was last updated. Each table metadata file should update this field just before writing.
    When adding a new file, its data and file sequence numbers are set to null because the snapshot's sequence number is not assigned until the snapshot is successfully committed.

  You could just open the most recent 10 metadata.json files to save on performance.  The below code opens all the files which could be wasteful on resources.

  NOTE: Only use this approach if you CANNOT directly read the Iceberg Catalog "SQL" database.
  If you can read teh Iceberg Catalog "SQL" database, it has a pointer to the latest metadata file and is the preferred approach.

  Args:
    gcs_path: The GCS path to search within (e.g., 'gs://my-bucket/my-folder/').

  Returns:
    The GCS path of the latest metadata file, or None if no such file is found.
  """

  # Initialize Google Cloud Storage client
  storage_client = storage.Client()

  # Extract bucket name and blob prefix from the GCS path
  bucket_name = gcs_path.replace("gs://", "").split("/")[0]
  # print(f"bucket_name: {bucket_name}")
  blob_prefix = "/".join(gcs_path.replace("gs://", "").split("/")[1:])
  # print(f"blob_prefix: {blob_prefix}")

  # Get all blobs within the specified path
  blobs = storage_client.list_blobs(bucket_name, prefix=blob_prefix)

  latest_metadata_file = None
  latest_timestamp = datetime.min

  # Iterate through the blobs and find the latest metadata file
  latest_blob_filename = ""
  latest_last_sequence_number = 0
  latest_last_updated_ms = 0
  current_schema_id = 0

  for blob in blobs:
    # We only want the metadata files
    if blob.name.endswith('metadata.json'):
      json_object = json.loads(blob.download_as_string())

      if json_object["last-sequence-number"] is None:
        continue

      #print(f"blob: {blob.name}")
      #print(f'last-sequence-number: {json_object["last-sequence-number"]}')
      #print(f'last-updated-ms: {json_object["last-updated-ms"]}')
      #print(f'current-schema-id: {json_object["current-schema-id"]}')

      ################################################################################################
      # NOTE: You might need to adjust this logic, it was not tested with every possible scenerio
      ################################################################################################

      # The highest sequence number wins
      if json_object["last-sequence-number"] > latest_last_sequence_number:
        latest_last_sequence_number = json_object["last-sequence-number"]
        latest_last_updated_ms = json_object["last-updated-ms"]
        current_schema_id = json_object["current-schema-id"]
        latest_blob_filename = blob.name

      # Sequence number tie, the highest last updated ms wins
      if json_object["last-sequence-number"] == latest_last_sequence_number:
        if json_object["last-updated-ms"] > latest_last_updated_ms:
          latest_last_sequence_number = json_object["last-sequence-number"]
          latest_last_updated_ms = json_object["last-updated-ms"]
          current_schema_id = json_object["current-schema-id"]
          latest_blob_filename = blob.name

      # Sequence number tie, the highest last updated ms wins
      if json_object["last-sequence-number"] == latest_last_sequence_number:
        if json_object["last-updated-ms"] == latest_last_updated_ms:
          if json_object["last-column-id"] > current_schema_id:
            latest_last_sequence_number = json_object["last-sequence-number"]
            latest_last_updated_ms = json_object["last-updated-ms"]
            current_schema_id = json_object["current-schema-id"]
            latest_blob_filename = blob.name

  return "gs://" + bucket_name + "/" + latest_blob_filename

In [None]:
metadata_uri = determineLatestIcebergMetadataFile(f"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata")
print(metadata_uri)

In [None]:
print("Open this URL to see your storage account:")
print(f"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata?project={project_id}")

In [None]:
# Create the table using the "latest" metadata file

sql = f"""
CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.customer_iceberg`
WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`
OPTIONS (
  format = "ICEBERG",
  uris = ["{metadata_uri}"]
);"""

RunQuery(sql)

In [None]:
%%bigquery

SELECT * FROM `biglake_dataset.customer_iceberg` ORDER BY customer_id;

In [None]:
# Add 4 more rows, at this point BigQuery cannot see these until we update the metadata

df = pa.Table.from_pylist(
    [
        {"customer_id": 5, "customer_name": "Customer 005"},
        {"customer_id": 6, "customer_name": "Customer 006"},
        {"customer_id": 7, "customer_name": "Customer 007"},
        {"customer_id": 8, "customer_name": "Customer 008"},
    ],
)

table.append(df)

In [None]:
print("Open this URL to see your storage account. You will see more metadata files.")
print(f"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata?project={project_id}")

In [None]:
def patchBigLakeExternalIcebergTable(project_id, dataset_name, table_name, metadata_uri):
  """This will update the pointer of our BigLake Iceberg table to point to the most recent metadata.
  The metadata will only be updated if the table is not already pointing to the most recent metadata.

  Args:
    project_id: The ID of the Google Cloud project.
    dataset_name: The name of the BigQuery dataset.
    table_name: The name of the BigQuery table.
    metadata_uri: The URI of the metadata file.
  """

  access_token = getGoogleCredentials().token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # First determine if we need to update the metadata (skip the update if so)
  # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get
  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_name}/tables/{table_name}"
  response = requests.get(url, headers=headers)

  #"externalDataConfiguration": {
  #  "sourceUris": [
  #    "gs://biglake-data-analytics-preview/default.db/customer/metadata/00003-5cf6efcb-136b-41ed-8751-7b0cfccc75e0.metadata.json"
  #  ],
  #  "sourceFormat": "ICEBERG",
  #  "autodetect": true,
  #  "connectionId": "data-analytics-preview.us.biglake-notebook-connection"
  #},

  if response.status_code == 200:
    json_object = json.loads(response.content)
    if json_object["externalDataConfiguration"]["sourceUris"][0] == metadata_uri:
      print("No need to update metadata")
      return True
  else:
    error = f"Could not get table metadata for:'{project_id}.{dataset_name}.{table_name}'  Status:'{response.status_code}' Text:'{response.text}'"
    raise RuntimeError(error)

  # https://cloud.google.com/bigquery/docs/iceberg-tables#api
  # PATCH https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/datasets/DATASET/tables/TABLE?autodetect_schema=true
  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_name}/tables/{table_name}?autodetect_schema=true"

  payload = {
     "externalDataConfiguration": {
      "sourceFormat": "ICEBERG",
      "sourceUris": [
        metadata_uri
      ]
    },
    "schema": None
  }

  response = requests.patch(url, json=payload, headers=headers)

  if response.status_code == 200:
    print(f"Table Patched")
    return True
  else:
    error = f"Error PATCHing table:'{project_id}.{dataset_name}.{table_name}'  Status:'{response.status_code}' Text:'{response.text}'"
    raise RuntimeError(error)

In [None]:
# Get the latest metadata and update our BigLake table

metadata_uri = determineLatestIcebergMetadataFile(f"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata")
print(metadata_uri)

patchBigLakeExternalIcebergTable(project_id, "biglake_dataset", "customer_iceberg", metadata_uri)

In [None]:
%%bigquery

-- We shoud now see 8 records
SELECT * FROM `biglake_dataset.customer_iceberg` ORDER BY customer_id;

In [None]:
# Change the table schema and then PATCH the BigLake table
from pyiceberg.types import NestedField, StringType, DoubleType, LongType

table = catalog.load_table(f"{iceberg_catalog_namespace}.customer")

with table.update_schema() as update:
    update.add_column("customer_description", StringType())

In [None]:
# Get the latest metadata and update our BigLake table

metadata_uri = determineLatestIcebergMetadataFile(f"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata")
print(metadata_uri)

patchBigLakeExternalIcebergTable(project_id, "biglake_dataset", "customer_iceberg", metadata_uri)

In [None]:
%%bigquery

-- We shoud now see 8 records with the NEW field with the value of None
SELECT * FROM `biglake_dataset.customer_iceberg` ORDER BY customer_id;

In [None]:
# Add 4 more rows, at this point BigQuery cannot see these until we update the metadata

df = pa.Table.from_pylist(
    [
        {"customer_id": 9,  "customer_name": "Customer 009", "customer_description" : "Customer Description 009"},
        {"customer_id": 10, "customer_name": "Customer 010", "customer_description" : "Customer Description 010"},
        {"customer_id": 11, "customer_name": "Customer 011", "customer_description" : "Customer Description 011"},
        {"customer_id": 12, "customer_name": "Customer 012", "customer_description" : "Customer Description 012"},
    ],
)

table.append(df)

In [None]:
metadata_uri = determineLatestIcebergMetadataFile(f"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata")
print(metadata_uri)

In [None]:
# Get the latest metadata and update our BigLake table

metadata_uri = determineLatestIcebergMetadataFile(f"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata")
print(metadata_uri)

patchBigLakeExternalIcebergTable(project_id, "biglake_dataset", "customer_iceberg", metadata_uri)

In [None]:
%%bigquery

-- We shoud now see 12 records with the NEW field with the value of None for the first 8 and then 4 with descriptions
SELECT * FROM `biglake_dataset.customer_iceberg` ORDER BY customer_id;

### Algorithm Overall Thoughts / Notes about Automation (Cloud Function)



Cloud Function - How it could work
  - **Best Apporach** Read the Iceberg catalog every {x} minutes (or have a trigger on the table to push a notification for near realtime). See the code below "Explore the Iceberg Catalog tables".
    - For each table
      - Get the latest metadata json file from Iceberg Catalog (**Most Reliable**)
      - Test to see if the file is on GCS. It should be written first before the catalog is updated.  So, this is just a sanity check.
      - Call BQ REST API and get table info
      - Compare the URI from storage and BQ table def
      - If different then patch BQ

  - **Good** Run the Cloud Function every {x} minutes.
    - List of tables and metadata gcs paths
    - For each table
      - Get the latest metadata json file from GCS (**See Function: determineLatestIcebergMetadataFile**)
      - Call BQ REST API and get table info
      - Compare the URI from storage and BQ table def
      - If different then patch BQ

  - **Good, but complex and noisy** Use GCS Notifications (this is slight more complex and we need to deal with missed items, delays and re-processing if not complete)
    - We get **all** files being added/updated on GCS (noisy)
    - Only look at certain paths (we have list of tables / metadata gcs paths)
    - If a file has changed/added *.metatdata.json in one of our table paths
    - Get the latest metadata json file from GCS (**See Function: determineLatestIcebergMetadataFile**).  This might not even been the file we got a notification for.
    - Call BQ REST API and get table info
    - Compare the URI from storage and BQ table def
    - If different then patch BQ


### Explore the Iceberg Catalog tables

Ideally we read the Catalog and **do not** rely on GCS and determining the latest manifest files.  

Review the output of the SQL ```SELECT * FROM iceberg_tables```.  It contains a metadata_location field that points to the latest commited metadata file.

In [None]:
import sqlite3

# Connect to the SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect(f'{warehouse_path}/pyiceberg_catalog.db')

# Create a cursor object
cursor = conn.cursor()

print("#################################################################")
print("Show all tables in our Iceberg Catalog")
print("#################################################################")
# Execute a SQL query to retrieve table names
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")

# Fetch all results
tables = cursor.fetchall()

# Print the table names
for table in tables:
  print(table[0])

print()
print()
print("#################################################################")
print("SELECT * FROM iceberg_namespace_properties")
print("#################################################################")
# Execute a SQL query to retrieve data from a table
cursor.execute("SELECT * FROM iceberg_namespace_properties")

# Fetch all results
rows = cursor.fetchall()

# Get the column names
column_names = [description[0] for description in cursor.description]

# Print the column names
print(column_names)

# Print the data
for row in rows:
  print(row)

print()
print()
print("#################################################################")
print("SELECT * FROM iceberg_tables")
print("#################################################################")
# Execute a SQL query to retrieve data from a table
cursor.execute("SELECT * FROM iceberg_tables")

# Fetch all results
rows = cursor.fetchall()

# Get the column names
column_names = [description[0] for description in cursor.description]

# Print the column names
print(column_names)

# Print the data
for row in rows:
  print(row)
  # Check 'catalog_name', 'table_namespace', 'table_name'
  if row[0] == "default" and row[1] == "default" and row[2] == "customer":
    metadata__uri_from_iceberg_catalog = row[3] # metadata_location

# Close the connection
conn.close()

In [None]:
# Get the latest metadata and update our BigLake table

metadata_uri = metadata__uri_from_iceberg_catalog
print(f"metadata__uri_from_iceberg_catalog: {metadata__uri_from_iceberg_catalog}")

patchBigLakeExternalIcebergTable(project_id, "biglake_dataset", "customer_iceberg", metadata_uri)