colab-enterprise/GenAI-Generate-Table-and-Column-Descriptions.ipynb (730 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "HC-_kPvkYlWu"
},
"source": [
"### <font color='#4285f4'>Overview</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "qlckxzNLYnWG"
},
"source": [
"\n",
"\n",
"* This will use Gemini to generate table and column descriptions for each table in BigQuery (not view or materialized view)\n",
"* You will need to change the prompt along with the dataset names to use in your own project.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "LuRsm1WsYqfG"
},
"source": [
"### <font color='#4285f4'>License</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NQjXn8ZGYsmV"
},
"source": [
"```\n",
"# Copyright 2024 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "zZmA2lDjYxTA"
},
"source": [
"### <font color='#4285f4'>Pip installs</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "AZrtV_qTYy40"
},
"outputs": [],
"source": [
"# PIP Installs\n",
"import sys\n",
"\n",
"# https://serpapi.com/\n",
"#!{sys.executable} -m pip install xxx"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Wgk2ZlLhY18v"
},
"source": [
"### <font color='#4285f4'>Initialize</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "TmTbGiTKPgHb"
},
"outputs": [],
"source": [
"from PIL import Image\n",
"from IPython.display import HTML\n",
"from IPython.display import Audio\n",
"from functools import reduce\n",
"import IPython.display\n",
"import google.auth\n",
"import requests\n",
"import json\n",
"import uuid\n",
"import base64\n",
"import os\n",
"import cv2\n",
"import random\n",
"import time\n",
"import datetime\n",
"import base64\n",
"import random\n",
"import datetime\n",
"\n",
"import logging\n",
"from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "V_uK3CooPGK7"
},
"outputs": [],
"source": [
"# Set these (run this cell to verify the output)\n",
"\n",
"bigquery_location = \"${bigquery_region}\"\n",
"region = \"${vertex_ai_region}\"\n",
"location = \"${vertex_ai_region}\"\n",
"storage_account = \"${bucket_name}\"\n",
"\n",
"\n",
"# Get the current date and time\n",
"now = datetime.datetime.now()\n",
"\n",
"# Format the date and time as desired\n",
"formatted_date = now.strftime(\"%Y-%m-%d-%H-%M\")\n",
"\n",
"# Get some values using gcloud\n",
"project_id = !(gcloud config get-value project)\n",
"user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n",
"\n",
"if len(project_id) != 1:\n",
" raise RuntimeError(f\"project_id is not set: {project_id}\")\n",
"project_id = project_id[0]\n",
"\n",
"if len(user) != 1:\n",
" raise RuntimeError(f\"user is not set: {user}\")\n",
"user = user[0]\n",
"\n",
"print(f\"project_id = {project_id}\")\n",
"print(f\"user = {user}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "BAAGiuFDY8cC"
},
"source": [
"### <font color='#4285f4'>Helper Methods</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3Y-G1Fn7Y-FO"
},
"source": [
"#### restAPIHelper\n",
"Calls the Google Cloud REST API using the current users credentials."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Lp-vvFIoPlbc"
},
"outputs": [],
"source": [
"def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n",
" \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n",
"\n",
" import requests\n",
" import google.auth\n",
" import json\n",
"\n",
" # Get an access token based upon the current user\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request()\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" if http_verb == \"GET\":\n",
" response = requests.get(url, headers=headers)\n",
" elif http_verb == \"POST\":\n",
" response = requests.post(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PUT\":\n",
" response = requests.put(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PATCH\":\n",
" response = requests.patch(url, json=request_body, headers=headers)\n",
" elif http_verb == \"DELETE\":\n",
" response = requests.delete(url, headers=headers)\n",
" else:\n",
" raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n",
"\n",
" if response.status_code == 200:\n",
" return json.loads(response.content)\n",
" #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n",
" else:\n",
" error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n",
" raise RuntimeError(error)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "lLBJOY1LZAYc"
},
"source": [
"#### RetryCondition (for retrying LLM calls)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2d1kPYvBPnpm"
},
"outputs": [],
"source": [
"def RetryCondition(error):\n",
" error_string = str(error)\n",
" print(error_string)\n",
"\n",
" retry_errors = [\n",
" \"RESOURCE_EXHAUSTED\",\n",
" \"No content in candidate\",\n",
" # Add more error messages here as needed\n",
" ]\n",
"\n",
" for retry_error in retry_errors:\n",
" if retry_error in error_string:\n",
" print(\"Retrying...\")\n",
" return True\n",
"\n",
" return False"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "QOO83IUrZDbP"
},
"source": [
"#### Gemini LLM (Pro 1.0 , Pro 1.5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "o2M5o6jUPqQS"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def GeminiLLM(prompt, model = \"gemini-2.0-flash\", response_schema = None,\n",
" temperature = 1, topP = 1, topK = 32):\n",
"\n",
" # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models\n",
"\n",
" llm_response = None\n",
" if temperature < 0:\n",
" temperature = 0\n",
"\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request() # required to acess access token\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference\n",
" url = f\"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent\"\n",
"\n",
" generation_config = {\n",
" \"temperature\": temperature,\n",
" \"topP\": topP,\n",
" \"maxOutputTokens\": 8192,\n",
" \"candidateCount\": 1,\n",
" \"responseMimeType\": \"application/json\",\n",
" }\n",
"\n",
" # Add in the response schema for when it is provided\n",
" if response_schema is not None:\n",
" generation_config[\"responseSchema\"] = response_schema\n",
"\n",
" if model == \"gemini-2.0-flash\":\n",
" generation_config[\"topK\"] = topK\n",
"\n",
" payload = {\n",
" \"contents\": {\n",
" \"role\": \"user\",\n",
" \"parts\": {\n",
" \"text\": prompt\n",
" },\n",
" },\n",
" \"generation_config\": {\n",
" **generation_config\n",
" },\n",
" \"safety_settings\": {\n",
" \"category\": \"HARM_CATEGORY_SEXUALLY_EXPLICIT\",\n",
" \"threshold\": \"BLOCK_LOW_AND_ABOVE\"\n",
" }\n",
" }\n",
"\n",
" response = requests.post(url, json=payload, headers=headers)\n",
"\n",
" if response.status_code == 200:\n",
" try:\n",
" json_response = json.loads(response.content)\n",
" except Exception as error:\n",
" raise RuntimeError(f\"An error occurred parsing the JSON: {error}\")\n",
"\n",
" if \"candidates\" in json_response:\n",
" candidates = json_response[\"candidates\"]\n",
" if len(candidates) > 0:\n",
" candidate = candidates[0]\n",
" if \"content\" in candidate:\n",
" content = candidate[\"content\"]\n",
" if \"parts\" in content:\n",
" parts = content[\"parts\"]\n",
" if len(parts):\n",
" part = parts[0]\n",
" if \"text\" in part:\n",
" text = part[\"text\"]\n",
" llm_response = text\n",
" else:\n",
" raise RuntimeError(\"No text in part: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No parts in content: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No parts in content: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No content in candidate: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No candidates: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No candidates: {response.content}\")\n",
"\n",
" # Remove some typically response characters (if asking for a JSON reply)\n",
" llm_response = llm_response.replace(\"```json\",\"\")\n",
" llm_response = llm_response.replace(\"```\",\"\")\n",
" llm_response = llm_response.replace(\"\\n\",\"\")\n",
"\n",
" return llm_response\n",
"\n",
" else:\n",
" raise RuntimeError(f\"Error with prompt:'{prompt}' Status:'{response.status_code}' Text:'{response.text}'\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "23V4-m8-ZFbq"
},
"source": [
"#### Gets a table schema (in json like format) for a BigQuery table"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "F9KRjQPVO6mB"
},
"outputs": [],
"source": [
"def GetTableSchema(project, dataset_name, table_name):\n",
" import io\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client()\n",
"\n",
" dataset_ref = client.dataset(dataset_name, project)\n",
" table_ref = dataset_ref.table(table_name)\n",
" table = client.get_table(table_ref)\n",
"\n",
" f = io.StringIO(\"\")\n",
" client.schema_to_json(table.schema, f)\n",
" return f.getvalue()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "F97T8vxAZKS2"
},
"source": [
"#### Runs a query in BigQuery"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "86jYAoplT3r_"
},
"outputs": [],
"source": [
"def RunQuery(sql):\n",
" import time\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client()\n",
"\n",
" if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n",
" df_result = client.query(sql).to_dataframe()\n",
" return df_result\n",
" else:\n",
" job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n",
" query_job = client.query(sql, job_config=job_config)\n",
"\n",
" # Check on the progress by getting the job's updated state.\n",
" query_job = client.get_job(\n",
" query_job.job_id, location=query_job.location\n",
" )\n",
" print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n",
"\n",
" while query_job.state != \"DONE\":\n",
" time.sleep(2)\n",
" query_job = client.get_job(\n",
" query_job.job_id, location=query_job.location\n",
" )\n",
" print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n",
"\n",
" if query_job.error_result == None:\n",
" return True\n",
" else:\n",
" raise Exception(query_job.error_result)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "R7nkUbB5ZO7G"
},
"source": [
"#### Gets a table description and column description (generated by Gemini)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fSnvAFb1Qjff"
},
"source": [
"```\n",
"Write me the json in OpenAPI 3.0 schema object for the below object.\n",
"Make all fields required.\n",
"{\n",
" \"project_id\" : \"text\",\n",
" \"dataset_name\" : \"text\",\n",
" \"table_name\" : \"text\",\n",
" \"table_description\" : \"text\",\n",
" \"columns\" : [ \"name\" : \"text\", \"description\" : \"text\"]\n",
"}\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "yP1zzxfZbqWz"
},
"source": [
"### <font color='#4285f4'>Geneate a Table Description</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "c5ce4OIHUkXv"
},
"outputs": [],
"source": [
"def get_table_description(project_id, dataset_name, table_name, metadata_info):\n",
" table_schema = GetTableSchema(project_id, dataset_name, table_name)\n",
" response_schema = {\n",
" \"type\": \"object\",\n",
" \"required\": [\n",
" \"project_id\",\n",
" \"dataset_name\",\n",
" \"table_name\",\n",
" \"table_description\",\n",
" \"columns\"\n",
" ],\n",
" \"properties\": {\n",
" \"project_id\": {\n",
" \"type\": \"string\",\n",
" \"format\": \"text\"\n",
" },\n",
" \"dataset_name\": {\n",
" \"type\": \"string\",\n",
" \"format\": \"text\"\n",
" },\n",
" \"table_name\": {\n",
" \"type\": \"string\",\n",
" \"format\": \"text\"\n",
" },\n",
" \"table_description\": {\n",
" \"type\": \"string\",\n",
" \"format\": \"text\"\n",
" },\n",
" \"columns\": {\n",
" \"type\": \"array\",\n",
" \"items\": {\n",
" \"type\": \"object\",\n",
" \"required\": [\n",
" \"name\",\n",
" \"description\"\n",
" ],\n",
" \"properties\": {\n",
" \"name\": {\n",
" \"type\": \"string\",\n",
" \"format\": \"text\"\n",
" },\n",
" \"description\": {\n",
" \"type\": \"string\",\n",
" \"format\": \"text\"\n",
" }\n",
" }\n",
" }\n",
" }\n",
" }\n",
" }\n",
"\n",
" # Get some sample data to pass to Gemini (we should only sample the data if it has many rows)\n",
" sample_data_results_json = []\n",
" try:\n",
" sql = f\"\"\"\n",
" SELECT\n",
" TOTAL_ROWS AS Cnt\n",
" FROM\n",
" `{project_id}.region-{bigquery_location}.INFORMATION_SCHEMA.TABLE_STORAGE_BY_PROJECT`\n",
" WHERE\n",
" TABLE_NAME = {table_name}\n",
" AND TABLE_SCHEMA = {dataset_name}\n",
" \"\"\"\n",
" results = RunQuery(sql)\n",
" count = 0\n",
" for index, row in results.iterrows():\n",
" count = int(row[\"Cnt\"])\n",
"\n",
" sample_percent = 100\n",
" if count < 10000:\n",
" sample_percent = 100\n",
" else:\n",
" sample_percent = 10\n",
"\n",
" sql = f\"SELECT * FROM `{project_id}.{dataset_name}.{table_name}` TABLESAMPLE SYSTEM ({sample_percent} PERCENT) LIMIT 100\"\n",
" sample_data_results = RunQuery(sql)\n",
" sample_data_results_json = sample_data_results.to_json(orient='records')\n",
"\n",
" print(f\"Table: {table_name}: {sample_data_results_json}\")\n",
" except:\n",
" # do nothing, we might not be able to query this table due to security access\n",
" print(f\"Table: {table_name}: Cannot view data in table\")\n",
"\n",
" prompt = f\"\"\"You are a database engineer and need to create column descriptions for a BigQuery table.\n",
" Sample data has been provided about the table.\n",
" Use your vast knowledge to figure out that the data applies to.\n",
" I need you to create a table description that describes the table.\n",
" I need you to create descriptions for each column in the below table schema.\n",
" Do not include actual values of the data in the table or column description, keep it a litte more generic.\n",
"\n",
" Metadata hint (a hint to what the table might be about):\n",
" {metadata_info}\n",
"\n",
" <table_schema>\n",
" {table_schema}\n",
" </table_schame>\n",
"\n",
" <sample_table_data>\n",
" {sample_data_results_json}\n",
" </sample_table_data>\n",
" \"\"\"\n",
"\n",
" gemini_json = GeminiLLM(prompt, response_schema=response_schema)\n",
" gemini_dict = json.loads(gemini_json)\n",
"\n",
" table_sql = f\"ALTER TABLE `{project_id}.{dataset_name}.{table_name}`\\n\"\n",
" table_description = gemini_dict['table_description'].replace(\"'\",\"\\'\")\n",
" table_sql += f\"SET OPTIONS (description = '{table_description}');\"\n",
"\n",
" column_sql = f\"ALTER TABLE `{project_id}.{dataset_name}.{table_name}`\\n\"\n",
" for item in gemini_dict[\"columns\"]:\n",
" column_description = item['description'].replace(\"'\",\"\\'\")\n",
" column_sql += f\"ALTER COLUMN {item['name']} SET OPTIONS (description='{column_description}'),\\n\"\n",
"\n",
" column_sql = column_sql[:-2] + \";\"\n",
"\n",
" return table_sql, column_sql"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Q0yhMcYoZd5P"
},
"source": [
"### <font color='#4285f4'>Generate the Descriptions</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "784sEYHBUA-t"
},
"outputs": [],
"source": [
"dataset_name = \"taxi_dataset\"\n",
"\n",
"table_list_sql = f\"\"\"SELECT table_name\n",
" FROM {dataset_name}.INFORMATION_SCHEMA.TABLES\n",
" WHERE table_type = 'BASE TABLE'\n",
" ORDER BY table_name;\"\"\"\n",
"\n",
"results = RunQuery(table_list_sql)\n",
"\n",
"alter_table_list = []\n",
"\n",
"for index, row in results.iterrows():\n",
" metadata_info = \"The table is releated to taxi ride data as part of the New York City public dataset.\"\n",
" table_name = row['table_name']\n",
" if table_name == 'datastream_cdc_data':\n",
" metadata_info = \"The table is related to a change data capture process used by Google Cloud Datastream to seed data for CDC.\"\n",
"\n",
" table_sql, column_sql = get_table_description(project_id, dataset_name, table_name, metadata_info)\n",
" alter_table_list.append(table_sql)\n",
" alter_table_list.append(column_sql)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "OnhIEs4hWChu"
},
"outputs": [],
"source": [
"for item in alter_table_list:\n",
" print(item)\n",
" print()\n",
" print()\n",
" # Optional\n",
" # You can run this in BigQuery as a SQL Script or automate running \"RunQuery\" for each item.\n",
" # RunQuery(item)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "aZu7wPe4trf8"
},
"source": [
"Notes:\n",
"\n",
"\n",
"- If you want to update the schema for external tables you cannot run the ALTER table command you need to update the schema. See the \"BigLake-Demo\" notebook. You would need to get the schema, then add descriptions to the schema (or update them) then call updateTableSchema. External tables do not support the ALTER TABLE command.\n",
"```\n",
"def updateTableSchema(project_id, dataset_name, table_name, new_schema):\n",
" import io\n",
" import google.cloud.bigquery as bigquery\n",
"\n",
" client = bigquery.Client()\n",
"\n",
" dataset_ref = client.dataset(dataset_name, project=project_id)\n",
" table_ref = dataset_ref.table(table_name)\n",
" table = client.get_table(table_ref)\n",
"\n",
" table.schema = new_schema\n",
" table = client.update_table(table, [\"schema\"])\n",
"\n",
" print(f\"Table {table_name} schema updated!\")\n",
"```\n",
"\n"
]
}
],
"metadata": {
"colab": {
"cell_execution_strategy": "setup",
"collapsed_sections": [
"HC-_kPvkYlWu",
"LuRsm1WsYqfG",
"zZmA2lDjYxTA",
"Wgk2ZlLhY18v",
"3Y-G1Fn7Y-FO",
"lLBJOY1LZAYc",
"QOO83IUrZDbP",
"23V4-m8-ZFbq",
"F97T8vxAZKS2"
],
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}