colab-enterprise/BigLake-Iceberg-Sync-Demo.ipynb (1,393 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "vW69NH2bMi9K" }, "source": [ "### <font color='blue'>License</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "AErdlrMjMdKj" }, "source": [ "```\n", "# Copyright 2024 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "```\n", "\n", "Author: Adam Paternostro" ] }, { "cell_type": "markdown", "metadata": { "id": "hZZ2om3xe9Ax" }, "source": [ "### Iceberg / BigQuery Sync Demo (Readme)\n", "Shows how to create an Iceberg table on GCS, create a BigLake External table and updating the BigLake External table's metadata. This can be used for keeping BigQuery in sync with external Iceberg tables that are being written to by third parties." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RO6kfu7UNQoR" }, "outputs": [], "source": [ "from IPython.display import Image\n", "Image(url='https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-Iceberg-Sync-Demo-Preferred.png', width=1200)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "X9vAiuDkCn9B" }, "outputs": [], "source": [ "Image(url='https://storage.googleapis.com/data-analytics-golden-demo/biglake/v1/artifacts/BigLake-Iceberg-Sync-Demo-Secondary.png', width=1200)" ] }, { "cell_type": "markdown", "metadata": { "id": "jHEEWOJCe6zk" }, "source": [ "### PIP Install (Run once)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dcbOGis9NWXJ" }, "outputs": [], "source": [ "!pip install --upgrade pip" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "MkkE_Q8fPVAm" }, "outputs": [], "source": [ "!pip install \"pyiceberg[gcsfs,hive]\"" ] }, { "cell_type": "markdown", "metadata": { "id": "cghkfWlte_n4" }, "source": [ "### Initialize Code" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "17SxSYMnTDfo" }, "outputs": [], "source": [ "import google.auth\n", "import requests\n", "import json\n", "import datetime\n", "import os\n", "import re\n", "import pyarrow as pa\n", "from pyiceberg.catalog.sql import SqlCatalog\n", "from google.cloud import storage\n", "from datetime import datetime\n", "\n", "\n", "# Parameters / Variables\n", "# Get some values using gcloud\n", "project_id = !(gcloud config get-value project)\n", "user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n", "\n", "if len(project_id) != 1:\n", " raise RuntimeError(f\"project_id is not set: {project_id}\")\n", "project_id = project_id[0]\n", "\n", "if len(user) != 1:\n", " raise RuntimeError(f\"user is not set: {user}\")\n", "user = user[0]\n", "\n", "bigquery_location = \"us\"\n", "iceberg_catalog_namespace = \"default\"\n", "biglake_bucket_name = \"biglake-\" + project_id\n", "biglake_connection_name = \"biglake-notebook-connection\"\n", "warehouse_path = \"/tmp/sqlite\"\n", "\n", "# Make the SQLite directory (if not exists). NOTE: This should be a HIVE metastore or something more permanent\n", "if not os.path.exists(warehouse_path):\n", " os.makedirs(warehouse_path)\n", "\n", "params = { \"project_id\" : project_id,\n", " \"bigquery_location\" : bigquery_location,\n", " \"biglake_connection_name\": biglake_connection_name,\n", " \"biglake_bucket_name\" : biglake_bucket_name,\n", " \"user\" : user\n", " }" ] }, { "cell_type": "markdown", "metadata": { "id": "FG8mf-gNeRaL" }, "source": [ "### Helper Functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "yo36XzpmS65n" }, "outputs": [], "source": [ "def getGoogleCredentials():\n", " \"\"\"Gets the current users credentials.\n", "\n", " Args:\n", " None.\n", " \"\"\"\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request() # required to acess access token\n", " creds.refresh(auth_req)\n", " return creds" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vJ8ABhlLV2Di" }, "outputs": [], "source": [ "def datetimeToUnixMs(dt):\n", " \"\"\"Converts a datetime object to a unix timestamp in milliseconds.\n", "\n", " Args:\n", " dt: A datetime object.\n", " \"\"\"\n", " return int(dt.timestamp() * 1000)" ] }, { "cell_type": "markdown", "metadata": { "id": "f05MM_xPILUe" }, "source": [ "#### deleteGCSFoldereletes all the files in a GCS folder" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "aFNglPfXqeTa" }, "outputs": [], "source": [ "def deleteGCSFolder(bucket_name, folder_name):\n", " \"\"\"Deletes all files in a GCS folder.\n", "\n", " Args:\n", " bucket_name: The name of the GCS bucket.\n", " folder_name: The name of the folder to delete.\n", " \"\"\"\n", "\n", " storage_client = storage.Client()\n", " bucket = storage_client.bucket(bucket_name)\n", " blobs = bucket.list_blobs(prefix=folder_name)\n", "\n", " for blob in blobs:\n", " blob.delete()" ] }, { "cell_type": "markdown", "metadata": { "id": "P_52x4o6IGnU" }, "source": [ "#### RunQuery\n", "Runs a SQL statement on BigQuery" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "c2z9Hnjql1D7" }, "outputs": [], "source": [ "def RunQuery(sql):\n", " \"\"\"Runs a SQL statement on BigQuery and shows the status of the BigQuery job.\n", "\n", " Args:\n", " sql: The SQL statement to run.\n", " \"\"\"\n", " import time\n", " from google.cloud import bigquery\n", " client = bigquery.Client()\n", "\n", " if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n", " df_result = client.query(sql).to_dataframe()\n", " return df_result\n", " else:\n", " job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n", " query_job = client.query(sql, job_config=job_config)\n", "\n", " # Check on the progress by getting the job's updated state.\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " while query_job.state != \"DONE\":\n", " time.sleep(2)\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " if query_job.error_result == None:\n", " return True\n", " else:\n", " return False" ] }, { "cell_type": "markdown", "metadata": { "id": "h3XiGMZQHv_q" }, "source": [ "#### restAPIHelper\n", "Calls the Google Cloud REST API using the current users credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "tDaQcCnvHst3" }, "outputs": [], "source": [ "def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n", " \"\"\"Calls the Google Cloud REST API passing in the current users credentials\n", "\n", " Args:\n", " url: The URL to call.\n", " http_verb: The HTTP verb to use.\n", " request_body: The request body to send.\n", " \"\"\"\n", " import requests\n", " import google.auth\n", " import json\n", "\n", " # Get an access token based upon the current user\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request()\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " if http_verb == \"GET\":\n", " response = requests.get(url, headers=headers)\n", " elif http_verb == \"POST\":\n", " response = requests.post(url, json=request_body, headers=headers)\n", " elif http_verb == \"PUT\":\n", " response = requests.put(url, json=request_body, headers=headers)\n", " elif http_verb == \"PATCH\":\n", " response = requests.patch(url, json=request_body, headers=headers)\n", " elif http_verb == \"DELETE\":\n", " response = requests.delete(url, headers=headers)\n", " else:\n", " raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n", "\n", " if response.status_code == 200:\n", " return json.loads(response.content)\n", " #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n", " else:\n", " error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n", " raise RuntimeError(error)" ] }, { "cell_type": "markdown", "metadata": { "id": "kwM9gPbkIFT8" }, "source": [ "#### createBigLakeConnection\n", "Creates the BigQuery external connection and returns the generated service principal. The service principal then needs to be granted IAM access to resourses it requires." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PF8QRnqTHQg2" }, "outputs": [], "source": [ "def createBigLakeConnection(params):\n", " \"\"\"Creates a BigLake connection to be used for BigLake tables.\n", "\n", " Args:\n", " params: The parameters to use.\n", " \"\"\"\n", "\n", " # First find the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/list\n", " project_id = params[\"project_id\"]\n", " bigquery_location = params[\"bigquery_location\"]\n", " biglake_connection_name = params[\"biglake_connection_name\"]\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections\"\n", "\n", " # Gather existing connections\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createBigLakeConnection (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"connections\" in json_result:\n", " for item in json_result[\"connections\"]:\n", " print(f\"BigLake Connection: {item['name']}\")\n", " # \"projects/756740881369/locations/us/connections/biglake-notebook-connection\"\n", " # NOTE: We cannot test the complete name since it contains the project number and not id\n", " if item[\"name\"].endswith(f\"/locations/{bigquery_location}/connections/{biglake_connection_name}\"):\n", " print(\"Connection already exists\")\n", " serviceAccountId = item[\"cloudResource\"][\"serviceAccountId\"]\n", " return serviceAccountId\n", "\n", " # Create the connection\n", " # https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest/v1/projects.locations.connections/create\n", " print(\"Creating BigLake Connection\")\n", "\n", " url = f\"https://bigqueryconnection.googleapis.com/v1/projects/{project_id}/locations/{bigquery_location}/connections?connectionId={biglake_connection_name}\"\n", "\n", " request_body = {\n", " \"friendlyName\": biglake_connection_name,\n", " \"description\": \"BigLake Colab Notebooks Connection for Data Analytics Golden Demo\",\n", " \"cloudResource\": {}\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " serviceAccountId = json_result[\"cloudResource\"][\"serviceAccountId\"]\n", " print(\"BigLake Connection created: \", serviceAccountId)\n", " return serviceAccountId\n" ] }, { "cell_type": "markdown", "metadata": { "id": "ip6U3jX7IXoz" }, "source": [ "#### createGoogleCloudStorageBucket\n", "Create the Google Cloud Storage bucket that will be used for holding the BigLake files (avro, csv, delta, hudi, json, parquet)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8BJaR15jIY3U" }, "outputs": [], "source": [ "def createGoogleCloudStorageBucket(params):\n", " \"\"\"Creates a Google Cloud Storage bucket.\n", "\n", " Args:\n", " params: The parameters to use.\n", " \"\"\"\n", "\n", " # First find the bucket\n", " # https://cloud.google.com/storage/docs/json_api/v1/buckets/list\n", " project_id = params[\"project_id\"]\n", " biglake_bucket_name = params[\"biglake_bucket_name\"]\n", " url = f\"https://storage.googleapis.com/storage/v1/b?project={project_id}\"\n", "\n", " # Gather existing buckets\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createGoogleCloudStorageBucket (GET) json_result: {json_result}\")\n", "\n", " # Test to see if connection exists, if so return\n", " if \"items\" in json_result:\n", " for item in json_result[\"items\"]:\n", " print(f\"Bucket Id / Name: ({item['id']} / {item['name']}\")\n", " if item[\"id\"] == biglake_bucket_name:\n", " print(\"Bucket already exists\")\n", " return\n", "\n", " # Create the bucket\n", " # https://cloud.google.com/storage/docs/json_api/v1/buckets/insert\n", " print(\"Creating Google Cloud Bucket\")\n", "\n", " url = f\"https://storage.googleapis.com/storage/v1/b?project={project_id}&predefinedAcl=private&predefinedDefaultObjectAcl=private&projection=noAcl\"\n", "\n", " request_body = {\n", " \"name\": biglake_bucket_name\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print()\n", " print(f\"json_result: {json_result}\")\n", " print()\n", " print(\"BigLake Bucket created: \", biglake_bucket_name)" ] }, { "cell_type": "markdown", "metadata": { "id": "zCarwDPMIhGb" }, "source": [ "#### setBucketIamPolicy\n", "Added the BigLake External Connection Service Principal to the IAM permission of the GCS bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Q_4cjls6IiTF" }, "outputs": [], "source": [ "def setBucketIamPolicy(params, accountWithPrefix, role):\n", " \"\"\"Sets the bucket IAM policy. We need to add the BigLake Service Account to the IAM policy.\n", "\n", " Args:\n", " params: The parameters to use.\n", " accountWithPrefix: The account to add to the IAM policy. (the prefix is user: or serviceAccount: or group:)\n", " role: The role to add to the IAM policy.\n", " \"\"\"\n", " biglake_bucket_name = params[\"biglake_bucket_name\"]\n", "\n", " # Get the current bindings (if the account has access then skip)\n", " # https://cloud.google.com/storage/docs/json_api/v1/buckets/getIamPolicy\n", "\n", " url = f\"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/iam\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"setBucketIamPolicy (GET) json_result: {json_result}\")\n", "\n", " # Test to see if permissions exist\n", " if \"bindings\" in json_result:\n", " for item in json_result[\"bindings\"]:\n", " members = item[\"members\"]\n", " for member in members:\n", " if member == accountWithPrefix:\n", " print(\"Permissions exist\")\n", " return\n", "\n", " # Take the existing bindings and we need to append the new permission\n", " # Otherwise we loose the existing permissions\n", "\n", " bindings = json_result[\"bindings\"]\n", " new_permission = {\n", " \"role\": role,\n", " \"members\": [ accountWithPrefix ]\n", " }\n", "\n", " bindings.append(new_permission)\n", "\n", " # https://cloud.google.com/storage/docs/json_api/v1/buckets/setIamPolicy\n", " url = f\"https://storage.googleapis.com/storage/v1/b/{biglake_bucket_name}/iam\"\n", "\n", " request_body = { \"bindings\" : bindings }\n", "\n", " print(f\"Permission bindings: {bindings}\")\n", "\n", "\n", " json_result = restAPIHelper(url, \"PUT\", request_body)\n", " print()\n", " print(f\"json_result: {json_result}\")\n", " print()\n", " print(f\"Bucket IAM Permissions set for {accountWithPrefix} {role}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "WnwSpyPFJm6Q" }, "source": [ "#### getProjectNumber\n", "Gets the project number from a project id" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6RlIvsHVJmYJ" }, "outputs": [], "source": [ "def getProjectNumber(params):\n", " \"\"\"Gets the project number from a project id.\n", "\n", " Args:\n", " params: The parameters to use.\n", " \"\"\"\n", "\n", " if \"project_number\" not in params:\n", " # https://cloud.google.com/resource-manager/reference/rest/v1/projects/get?\n", " project_id = params[\"project_id\"]\n", "\n", " url = f\"https://cloudresourcemanager.googleapis.com/v1/projects/{project_id}\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"setBucketIamPolicy (GET) json_result: {json_result}\")\n", "\n", " project_number = json_result[\"projectNumber\"]\n", " params[\"project_number\"] = project_number\n", " print(f\"getProjectNumber: {project_number}\")\n", "\n", " else:\n", " project_number = params[\"project_number\"]\n", " print(f\"getProjectNumber: {project_number}\")\n", " return project_number" ] }, { "cell_type": "markdown", "metadata": { "id": "mAIc8ax1IoHk" }, "source": [ "#### activateServiceAPIs\n", "Enables Google Cloud APIs" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YZ7FQGdqIoqD" }, "outputs": [], "source": [ "def activateServiceAPIs(params):\n", " \"\"\"Batch activates service apis\n", "\n", " Args:\n", " params: The parameters to use.\n", " \"\"\"\n", "\n", " project_number = params[\"project_number\"]\n", "\n", " request_body = {\n", " \"serviceIds\" : [ \"biglake.googleapis.com\" ]\n", " }\n", "\n", " url = f\"https://serviceusage.googleapis.com/v1/projects/{project_number}/services:batchEnable\"\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " print(f\"activateServiceAPIs (POST) json_result: {json_result}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "z63V_4DaI3C6" }, "source": [ "#### initialize\n", "Calls the methods to create the external connection, create the GCS bucket, apply IAM permissions and copies the public data. This method can be re-run as required and does not cause duplication issues. Each method tests for the existance of items before creating." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "BzsPfbOuI39R" }, "outputs": [], "source": [ "def initialize(params):\n", " \"\"\"Create the BigLake connection, GCS bucket, set IAM permissions and copies data.\n", "\n", " Args:\n", " params: The parameters to use.\n", " \"\"\"\n", " # Create the BigLake connection (if not exists)\n", " bigLakeServiceAccountId = createBigLakeConnection(params)\n", " print(f\"createBigLakeConnection: {bigLakeServiceAccountId}\")\n", " params[\"bigLakeServiceAccountId\"] = bigLakeServiceAccountId\n", "\n", " # Create storage account (if not exists)\n", " createGoogleCloudStorageBucket(params)\n", "\n", " # Grant access to GCS Bucket for BigLake Connection (if not exists)\n", " setBucketIamPolicy(params, f\"serviceAccount:{bigLakeServiceAccountId}\", \"roles/storage.objectAdmin\")\n", " setBucketIamPolicy(params, f\"user:{user}\", \"roles/storage.admin\")\n", "\n", " getProjectNumber(params)\n", " activateServiceAPIs(params)" ] }, { "cell_type": "markdown", "metadata": { "id": "J1f6mybmJJp4" }, "source": [ "### Initialize the Demo" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NDkgCnb-JO6S" }, "outputs": [], "source": [ "initialize(params)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "BFJOII5fLLcZ" }, "outputs": [], "source": [ "%%bigquery --params $params\n", "\n", "CREATE SCHEMA IF NOT EXISTS biglake_dataset OPTIONS(location = @bigquery_location);" ] }, { "cell_type": "markdown", "metadata": { "id": "PpOXinnpeMFF" }, "source": [ "### Create the Iceberg Catalog and have it save the data to GCS" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CnIJ9-cQTRRQ" }, "outputs": [], "source": [ "# We will use SQLite for our catalog database, this is where we store our information as to where our tables are located\n", "# You would typically use Hive or an Iceberg catalog for this.\n", "# We also want our files to be stored in GCS\n", "# NOTE: This is not refreshing our \"gcs.oauth2.token\" which expires (typically every 59 minutes)\n", "\n", "creds = getGoogleCredentials()\n", "\n", "catalog = SqlCatalog(\n", " iceberg_catalog_namespace,\n", " **{\n", " \"uri\": f\"sqlite:///{warehouse_path}/pyiceberg_catalog.db\",\n", " \"gcs.oauth2.token-expires-at\": datetimeToUnixMs(creds.expiry),\n", " \"gcs.project-id\": project_id,\n", " \"gcs.oauth2.token\": creds.token,\n", " \"gcs.default-bucket-location\": f\"gs://{biglake_bucket_name}/\",\n", " \"warehouse\": f\"gs://{biglake_bucket_name}/\"\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "ag1Jh69OeXXb" }, "source": [ "### Create the default Iceberg namespace" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ri5MkILdo8Rd" }, "outputs": [], "source": [ "# Create the Namespace if it does not exist\n", "\n", "if (iceberg_catalog_namespace,) in catalog.list_namespaces():\n", " print(f\"Catalog iceberg_catalog_namespace already exists\")\n", "else:\n", " catalog.create_namespace(iceberg_catalog_namespace)" ] }, { "cell_type": "markdown", "metadata": { "id": "Jn5H9y-Jea5M" }, "source": [ "### Create a Customer Table, BigLake External Iceberg Table\n", "\n", "1. Create the customer iceberg table\n", "2. Create a BigLake External table pointing to the GCS metadata file (specific version)\n", "3. Append records to the customer iceberg table\n", "4. PATCH or update the BigLake External table setting the metadata file to the latest version\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "XQg5Pc4LYtuT" }, "outputs": [], "source": [ "# Create a customer table with 4 rows\n", "# https://py.iceberg.apache.org/api/#write-support\n", "\n", "# Define the schema for the customer table\n", "schema = pa.schema([\n", " ('customer_id', pa.int64()),\n", " ('customer_name', pa.string())\n", "])\n", "\n", "if (iceberg_catalog_namespace, 'customer') in catalog.list_tables(iceberg_catalog_namespace):\n", " # Drop the table\n", " catalog.drop_table(f\"{iceberg_catalog_namespace}.customer\")\n", " print(\"Table Dropped\")\n", "\n", "# We need to delete the GCS bucket folder (dropping an iceberg table does not delete the files on GCS)\n", "deleteGCSFolder(biglake_bucket_name,f\"{iceberg_catalog_namespace}.db/customer\")\n", "print(f\"All files in folder {iceberg_catalog_namespace}.db/customer/metadata have been deleted.\")\n", "\n", "table = catalog.create_table(f\"{iceberg_catalog_namespace}.customer\", schema=schema)\n", "\n", "df = pa.Table.from_pylist(\n", " [\n", " {\"customer_id\": 1, \"customer_name\": \"Customer 001\"},\n", " {\"customer_id\": 2, \"customer_name\": \"Customer 002\"},\n", " {\"customer_id\": 3, \"customer_name\": \"Customer 003\"},\n", " {\"customer_id\": 4, \"customer_name\": \"Customer 004\"},\n", " ],\n", ")\n", "\n", "table.append(df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "mzIl3PoRFY3y" }, "outputs": [], "source": [ "def determineLatestIcebergMetadataFile(gcs_path):\n", " \"\"\"\n", " Finds the file ending with 'metadata.json' with the most recent last-sequence-number and last-updated-ms.\n", " https://iceberg.apache.org/spec/#table-metadata-fields\n", " last-sequence-number: The table's highest assigned sequence number, a monotonically increasing long that tracks the order of snapshots in a table.\n", " last-updated-ms: Timestamp in milliseconds from the unix epoch when the table was last updated. Each table metadata file should update this field just before writing.\n", " When adding a new file, its data and file sequence numbers are set to null because the snapshot's sequence number is not assigned until the snapshot is successfully committed.\n", "\n", " You could just open the most recent 10 metadata.json files to save on performance. The below code opens all the files which could be wasteful on resources.\n", "\n", " NOTE: Only use this approach if you CANNOT directly read the Iceberg Catalog \"SQL\" database.\n", " If you can read teh Iceberg Catalog \"SQL\" database, it has a pointer to the latest metadata file and is the preferred approach.\n", "\n", " Args:\n", " gcs_path: The GCS path to search within (e.g., 'gs://my-bucket/my-folder/').\n", "\n", " Returns:\n", " The GCS path of the latest metadata file, or None if no such file is found.\n", " \"\"\"\n", "\n", " # Initialize Google Cloud Storage client\n", " storage_client = storage.Client()\n", "\n", " # Extract bucket name and blob prefix from the GCS path\n", " bucket_name = gcs_path.replace(\"gs://\", \"\").split(\"/\")[0]\n", " # print(f\"bucket_name: {bucket_name}\")\n", " blob_prefix = \"/\".join(gcs_path.replace(\"gs://\", \"\").split(\"/\")[1:])\n", " # print(f\"blob_prefix: {blob_prefix}\")\n", "\n", " # Get all blobs within the specified path\n", " blobs = storage_client.list_blobs(bucket_name, prefix=blob_prefix)\n", "\n", " latest_metadata_file = None\n", " latest_timestamp = datetime.min\n", "\n", " # Iterate through the blobs and find the latest metadata file\n", " latest_blob_filename = \"\"\n", " latest_last_sequence_number = 0\n", " latest_last_updated_ms = 0\n", " current_schema_id = 0\n", "\n", " for blob in blobs:\n", " # We only want the metadata files\n", " if blob.name.endswith('metadata.json'):\n", " json_object = json.loads(blob.download_as_string())\n", "\n", " if json_object[\"last-sequence-number\"] is None:\n", " continue\n", "\n", " #print(f\"blob: {blob.name}\")\n", " #print(f'last-sequence-number: {json_object[\"last-sequence-number\"]}')\n", " #print(f'last-updated-ms: {json_object[\"last-updated-ms\"]}')\n", " #print(f'current-schema-id: {json_object[\"current-schema-id\"]}')\n", "\n", " ################################################################################################\n", " # NOTE: You might need to adjust this logic, it was not tested with every possible scenerio\n", " ################################################################################################\n", "\n", " # The highest sequence number wins\n", " if json_object[\"last-sequence-number\"] > latest_last_sequence_number:\n", " latest_last_sequence_number = json_object[\"last-sequence-number\"]\n", " latest_last_updated_ms = json_object[\"last-updated-ms\"]\n", " current_schema_id = json_object[\"current-schema-id\"]\n", " latest_blob_filename = blob.name\n", "\n", " # Sequence number tie, the highest last updated ms wins\n", " if json_object[\"last-sequence-number\"] == latest_last_sequence_number:\n", " if json_object[\"last-updated-ms\"] > latest_last_updated_ms:\n", " latest_last_sequence_number = json_object[\"last-sequence-number\"]\n", " latest_last_updated_ms = json_object[\"last-updated-ms\"]\n", " current_schema_id = json_object[\"current-schema-id\"]\n", " latest_blob_filename = blob.name\n", "\n", " # Sequence number tie, the highest last updated ms wins\n", " if json_object[\"last-sequence-number\"] == latest_last_sequence_number:\n", " if json_object[\"last-updated-ms\"] == latest_last_updated_ms:\n", " if json_object[\"last-column-id\"] > current_schema_id:\n", " latest_last_sequence_number = json_object[\"last-sequence-number\"]\n", " latest_last_updated_ms = json_object[\"last-updated-ms\"]\n", " current_schema_id = json_object[\"current-schema-id\"]\n", " latest_blob_filename = blob.name\n", "\n", " return \"gs://\" + bucket_name + \"/\" + latest_blob_filename" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0uzEwZ5hixwr" }, "outputs": [], "source": [ "metadata_uri = determineLatestIcebergMetadataFile(f\"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata\")\n", "print(metadata_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iAOQRpY4bF7P" }, "outputs": [], "source": [ "print(\"Open this URL to see your storage account:\")\n", "print(f\"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata?project={project_id}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "32wwSLhVafIU" }, "outputs": [], "source": [ "# Create the table using the \"latest\" metadata file\n", "\n", "sql = f\"\"\"\n", "CREATE OR REPLACE EXTERNAL TABLE `{project_id}.biglake_dataset.customer_iceberg`\n", "WITH CONNECTION `{project_id}.{bigquery_location}.biglake-notebook-connection`\n", "OPTIONS (\n", " format = \"ICEBERG\",\n", " uris = [\"{metadata_uri}\"]\n", ");\"\"\"\n", "\n", "RunQuery(sql)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "B2KbwfeTbTLo" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "SELECT * FROM `biglake_dataset.customer_iceberg` ORDER BY customer_id;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0nLrIDqdbxuG" }, "outputs": [], "source": [ "# Add 4 more rows, at this point BigQuery cannot see these until we update the metadata\n", "\n", "df = pa.Table.from_pylist(\n", " [\n", " {\"customer_id\": 5, \"customer_name\": \"Customer 005\"},\n", " {\"customer_id\": 6, \"customer_name\": \"Customer 006\"},\n", " {\"customer_id\": 7, \"customer_name\": \"Customer 007\"},\n", " {\"customer_id\": 8, \"customer_name\": \"Customer 008\"},\n", " ],\n", ")\n", "\n", "table.append(df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rPLrz0EQb27d" }, "outputs": [], "source": [ "print(\"Open this URL to see your storage account. You will see more metadata files.\")\n", "print(f\"https://console.cloud.google.com/storage/browser/{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata?project={project_id}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "spJcBv9QcW0c" }, "outputs": [], "source": [ "def patchBigLakeExternalIcebergTable(project_id, dataset_name, table_name, metadata_uri):\n", " \"\"\"This will update the pointer of our BigLake Iceberg table to point to the most recent metadata.\n", " The metadata will only be updated if the table is not already pointing to the most recent metadata.\n", "\n", " Args:\n", " project_id: The ID of the Google Cloud project.\n", " dataset_name: The name of the BigQuery dataset.\n", " table_name: The name of the BigQuery table.\n", " metadata_uri: The URI of the metadata file.\n", " \"\"\"\n", "\n", " access_token = getGoogleCredentials().token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " # First determine if we need to update the metadata (skip the update if so)\n", " # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get\n", " url = f\"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_name}/tables/{table_name}\"\n", " response = requests.get(url, headers=headers)\n", "\n", " #\"externalDataConfiguration\": {\n", " # \"sourceUris\": [\n", " # \"gs://biglake-data-analytics-preview/default.db/customer/metadata/00003-5cf6efcb-136b-41ed-8751-7b0cfccc75e0.metadata.json\"\n", " # ],\n", " # \"sourceFormat\": \"ICEBERG\",\n", " # \"autodetect\": true,\n", " # \"connectionId\": \"data-analytics-preview.us.biglake-notebook-connection\"\n", " #},\n", "\n", " if response.status_code == 200:\n", " json_object = json.loads(response.content)\n", " if json_object[\"externalDataConfiguration\"][\"sourceUris\"][0] == metadata_uri:\n", " print(\"No need to update metadata\")\n", " return True\n", " else:\n", " error = f\"Could not get table metadata for:'{project_id}.{dataset_name}.{table_name}' Status:'{response.status_code}' Text:'{response.text}'\"\n", " raise RuntimeError(error)\n", "\n", " # https://cloud.google.com/bigquery/docs/iceberg-tables#api\n", " # PATCH https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/datasets/DATASET/tables/TABLE?autodetect_schema=true\n", " url = f\"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{dataset_name}/tables/{table_name}?autodetect_schema=true\"\n", "\n", " payload = {\n", " \"externalDataConfiguration\": {\n", " \"sourceFormat\": \"ICEBERG\",\n", " \"sourceUris\": [\n", " metadata_uri\n", " ]\n", " },\n", " \"schema\": None\n", " }\n", "\n", " response = requests.patch(url, json=payload, headers=headers)\n", "\n", " if response.status_code == 200:\n", " print(f\"Table Patched\")\n", " return True\n", " else:\n", " error = f\"Error PATCHing table:'{project_id}.{dataset_name}.{table_name}' Status:'{response.status_code}' Text:'{response.text}'\"\n", " raise RuntimeError(error)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "sRreXg2LcwYt" }, "outputs": [], "source": [ "# Get the latest metadata and update our BigLake table\n", "\n", "metadata_uri = determineLatestIcebergMetadataFile(f\"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata\")\n", "print(metadata_uri)\n", "\n", "patchBigLakeExternalIcebergTable(project_id, \"biglake_dataset\", \"customer_iceberg\", metadata_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "MNNJ20zOdk0s" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- We shoud now see 8 records\n", "SELECT * FROM `biglake_dataset.customer_iceberg` ORDER BY customer_id;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "lu79hcGZgR-b" }, "outputs": [], "source": [ "# Change the table schema and then PATCH the BigLake table\n", "from pyiceberg.types import NestedField, StringType, DoubleType, LongType\n", "\n", "table = catalog.load_table(f\"{iceberg_catalog_namespace}.customer\")\n", "\n", "with table.update_schema() as update:\n", " update.add_column(\"customer_description\", StringType())" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0BAVho5OAhIC" }, "outputs": [], "source": [ "# Get the latest metadata and update our BigLake table\n", "\n", "metadata_uri = determineLatestIcebergMetadataFile(f\"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata\")\n", "print(metadata_uri)\n", "\n", "patchBigLakeExternalIcebergTable(project_id, \"biglake_dataset\", \"customer_iceberg\", metadata_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PTLojRTH_9Va" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- We shoud now see 8 records with the NEW field with the value of None\n", "SELECT * FROM `biglake_dataset.customer_iceberg` ORDER BY customer_id;" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "_Svg6bcC5kMX" }, "outputs": [], "source": [ "# Add 4 more rows, at this point BigQuery cannot see these until we update the metadata\n", "\n", "df = pa.Table.from_pylist(\n", " [\n", " {\"customer_id\": 9, \"customer_name\": \"Customer 009\", \"customer_description\" : \"Customer Description 009\"},\n", " {\"customer_id\": 10, \"customer_name\": \"Customer 010\", \"customer_description\" : \"Customer Description 010\"},\n", " {\"customer_id\": 11, \"customer_name\": \"Customer 011\", \"customer_description\" : \"Customer Description 011\"},\n", " {\"customer_id\": 12, \"customer_name\": \"Customer 012\", \"customer_description\" : \"Customer Description 012\"},\n", " ],\n", ")\n", "\n", "table.append(df)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gR4DRw6dkG3i" }, "outputs": [], "source": [ "metadata_uri = determineLatestIcebergMetadataFile(f\"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata\")\n", "print(metadata_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "GvkCJrBEmvAj" }, "outputs": [], "source": [ "# Get the latest metadata and update our BigLake table\n", "\n", "metadata_uri = determineLatestIcebergMetadataFile(f\"gs://{biglake_bucket_name}/{iceberg_catalog_namespace}.db/customer/metadata\")\n", "print(metadata_uri)\n", "\n", "patchBigLakeExternalIcebergTable(project_id, \"biglake_dataset\", \"customer_iceberg\", metadata_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "fd5z07csmu9k" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "-- We shoud now see 12 records with the NEW field with the value of None for the first 8 and then 4 with descriptions\n", "SELECT * FROM `biglake_dataset.customer_iceberg` ORDER BY customer_id;" ] }, { "cell_type": "markdown", "metadata": { "id": "RRLTI8lNMVId" }, "source": [ "### Algorithm Overall Thoughts / Notes about Automation (Cloud Function)\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "zwXlDrcSv77w" }, "source": [ "Cloud Function - How it could work\n", " - **Best Apporach** Read the Iceberg catalog every {x} minutes (or have a trigger on the table to push a notification for near realtime). See the code below \"Explore the Iceberg Catalog tables\".\n", " - For each table\n", " - Get the latest metadata json file from Iceberg Catalog (**Most Reliable**)\n", " - Test to see if the file is on GCS. It should be written first before the catalog is updated. So, this is just a sanity check.\n", " - Call BQ REST API and get table info\n", " - Compare the URI from storage and BQ table def\n", " - If different then patch BQ\n", "\n", " - **Good** Run the Cloud Function every {x} minutes.\n", " - List of tables and metadata gcs paths\n", " - For each table\n", " - Get the latest metadata json file from GCS (**See Function: determineLatestIcebergMetadataFile**)\n", " - Call BQ REST API and get table info\n", " - Compare the URI from storage and BQ table def\n", " - If different then patch BQ\n", "\n", " - **Good, but complex and noisy** Use GCS Notifications (this is slight more complex and we need to deal with missed items, delays and re-processing if not complete)\n", " - We get **all** files being added/updated on GCS (noisy)\n", " - Only look at certain paths (we have list of tables / metadata gcs paths)\n", " - If a file has changed/added *.metatdata.json in one of our table paths\n", " - Get the latest metadata json file from GCS (**See Function: determineLatestIcebergMetadataFile**). This might not even been the file we got a notification for.\n", " - Call BQ REST API and get table info\n", " - Compare the URI from storage and BQ table def\n", " - If different then patch BQ\n" ] }, { "cell_type": "markdown", "metadata": { "id": "zMosers9-fZt" }, "source": [ "### Explore the Iceberg Catalog tables\n", "\n", "Ideally we read the Catalog and **do not** rely on GCS and determining the latest manifest files. \n", "\n", "Review the output of the SQL ```SELECT * FROM iceberg_tables```. It contains a metadata_location field that points to the latest commited metadata file." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "V_OgB4vA9Ovb" }, "outputs": [], "source": [ "import sqlite3\n", "\n", "# Connect to the SQLite database (or create it if it doesn't exist)\n", "conn = sqlite3.connect(f'{warehouse_path}/pyiceberg_catalog.db')\n", "\n", "# Create a cursor object\n", "cursor = conn.cursor()\n", "\n", "print(\"#################################################################\")\n", "print(\"Show all tables in our Iceberg Catalog\")\n", "print(\"#################################################################\")\n", "# Execute a SQL query to retrieve table names\n", "cursor.execute(\"SELECT name FROM sqlite_master WHERE type='table';\")\n", "\n", "# Fetch all results\n", "tables = cursor.fetchall()\n", "\n", "# Print the table names\n", "for table in tables:\n", " print(table[0])\n", "\n", "print()\n", "print()\n", "print(\"#################################################################\")\n", "print(\"SELECT * FROM iceberg_namespace_properties\")\n", "print(\"#################################################################\")\n", "# Execute a SQL query to retrieve data from a table\n", "cursor.execute(\"SELECT * FROM iceberg_namespace_properties\")\n", "\n", "# Fetch all results\n", "rows = cursor.fetchall()\n", "\n", "# Get the column names\n", "column_names = [description[0] for description in cursor.description]\n", "\n", "# Print the column names\n", "print(column_names)\n", "\n", "# Print the data\n", "for row in rows:\n", " print(row)\n", "\n", "print()\n", "print()\n", "print(\"#################################################################\")\n", "print(\"SELECT * FROM iceberg_tables\")\n", "print(\"#################################################################\")\n", "# Execute a SQL query to retrieve data from a table\n", "cursor.execute(\"SELECT * FROM iceberg_tables\")\n", "\n", "# Fetch all results\n", "rows = cursor.fetchall()\n", "\n", "# Get the column names\n", "column_names = [description[0] for description in cursor.description]\n", "\n", "# Print the column names\n", "print(column_names)\n", "\n", "# Print the data\n", "for row in rows:\n", " print(row)\n", " # Check 'catalog_name', 'table_namespace', 'table_name'\n", " if row[0] == \"default\" and row[1] == \"default\" and row[2] == \"customer\":\n", " metadata__uri_from_iceberg_catalog = row[3] # metadata_location\n", "\n", "# Close the connection\n", "conn.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wfhmUGBFNEwp" }, "outputs": [], "source": [ "# Get the latest metadata and update our BigLake table\n", "\n", "metadata_uri = metadata__uri_from_iceberg_catalog\n", "print(f\"metadata__uri_from_iceberg_catalog: {metadata__uri_from_iceberg_catalog}\")\n", "\n", "patchBigLakeExternalIcebergTable(project_id, \"biglake_dataset\", \"customer_iceberg\", metadata_uri)" ] } ], "metadata": { "colab": { "collapsed_sections": [ "vW69NH2bMi9K", "hZZ2om3xe9Ax", "jHEEWOJCe6zk", "cghkfWlte_n4", "P_52x4o6IGnU", "z63V_4DaI3C6", "J1f6mybmJJp4" ], "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }