### <font color='#4285f4'>Overview</font>



* This will use Gemini to generate table and column descriptions for each table in BigQuery (not view or materialized view)
* You will need to change the prompt along with the dataset names to use in your own project.


### <font color='#4285f4'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

### <font color='#4285f4'>Pip installs</font>

In [None]:
# PIP Installs
import sys

# https://serpapi.com/
#!{sys.executable} -m pip install xxx

### <font color='#4285f4'>Initialize</font>

In [None]:
from PIL import Image
from IPython.display import HTML
from IPython.display import Audio
from functools import reduce
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import random
import time
import datetime
import base64
import random
import datetime

import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

In [None]:
# Set these (run this cell to verify the output)

bigquery_location = "${bigquery_region}"
region = "${vertex_ai_region}"
location = "${vertex_ai_region}"
storage_account = "${bucket_name}"


# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

### <font color='#4285f4'>Helper Methods</font>

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### RetryCondition (for retrying LLM calls)

In [None]:
def RetryCondition(error):
  error_string = str(error)
  print(error_string)

  retry_errors = [
      "RESOURCE_EXHAUSTED",
      "No content in candidate",
      # Add more error messages here as needed
  ]

  for retry_error in retry_errors:
    if retry_error in error_string:
      print("Retrying...")
      return True

  return False

#### Gemini LLM (Pro 1.0 , Pro 1.5)

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM(prompt, model = "gemini-2.0-flash", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add in the response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-2.0-flash":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": {
          "text": prompt
      },
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

#### Gets a table schema (in json like format) for a BigQuery table

In [None]:
def GetTableSchema(project, dataset_name, table_name):
  import io
  from google.cloud import bigquery
  client = bigquery.Client()

  dataset_ref = client.dataset(dataset_name, project)
  table_ref = dataset_ref.table(table_name)
  table = client.get_table(table_ref)

  f = io.StringIO("")
  client.schema_to_json(table.schema, f)
  return f.getvalue()

#### Runs a query in BigQuery

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

#### Gets a table description and column description (generated by Gemini)

```
Write me the json in OpenAPI 3.0 schema object for the below object.
Make all fields required.
{
  "project_id" : "text",
   "dataset_name" : "text",
   "table_name" : "text",
   "table_description" : "text",
   "columns" : [ "name" : "text", "description" : "text"]
}
```

### <font color='#4285f4'>Geneate a Table Description</font>

In [None]:
def get_table_description(project_id, dataset_name, table_name, metadata_info):
  table_schema = GetTableSchema(project_id, dataset_name, table_name)
  response_schema = {
    "type": "object",
    "required": [
      "project_id",
      "dataset_name",
      "table_name",
      "table_description",
      "columns"
    ],
    "properties": {
      "project_id": {
        "type": "string",
        "format": "text"
      },
      "dataset_name": {
        "type": "string",
        "format": "text"
      },
      "table_name": {
        "type": "string",
        "format": "text"
      },
      "table_description": {
        "type": "string",
        "format": "text"
      },
      "columns": {
        "type": "array",
        "items": {
          "type": "object",
          "required": [
            "name",
            "description"
          ],
          "properties": {
            "name": {
              "type": "string",
              "format": "text"
            },
            "description": {
              "type": "string",
              "format": "text"
            }
          }
        }
      }
    }
  }

  # Get some sample data to pass to Gemini (we should only sample the data if it has many rows)
  sample_data_results_json = []
  try:
    sql = f"""
        SELECT
            TOTAL_ROWS AS Cnt
        FROM
            `{project_id}.region-{bigquery_location}.INFORMATION_SCHEMA.TABLE_STORAGE_BY_PROJECT`
        WHERE
            TABLE_NAME = {table_name}
            AND TABLE_SCHEMA = {dataset_name}
    """
    results = RunQuery(sql)
    count = 0
    for index, row in results.iterrows():
        count = int(row["Cnt"])

    sample_percent = 100
    if count < 10000:
      sample_percent = 100
    else:
      sample_percent = 10

    sql = f"SELECT * FROM `{project_id}.{dataset_name}.{table_name}` TABLESAMPLE SYSTEM ({sample_percent} PERCENT) LIMIT 100"
    sample_data_results = RunQuery(sql)
    sample_data_results_json = sample_data_results.to_json(orient='records')

    print(f"Table: {table_name}: {sample_data_results_json}")
  except:
    # do nothing, we might not be able to query this table due to security access
    print(f"Table: {table_name}: Cannot view data in table")

  prompt = f"""You are a database engineer and need to create column descriptions for a BigQuery table.
  Sample data has been provided about the table.
  Use your vast knowledge to figure out that the data applies to.
  I need you to create a table description that describes the table.
  I need you to create descriptions for each column in the below table schema.
  Do not include actual values of the data in the table or column description, keep it a litte more generic.

  Metadata hint (a hint to what the table might be about):
  {metadata_info}

  <table_schema>
  {table_schema}
  </table_schame>

  <sample_table_data>
  {sample_data_results_json}
  </sample_table_data>
  """

  gemini_json = GeminiLLM(prompt, response_schema=response_schema)
  gemini_dict = json.loads(gemini_json)

  table_sql = f"ALTER TABLE `{project_id}.{dataset_name}.{table_name}`\n"
  table_description = gemini_dict['table_description'].replace("'","\'")
  table_sql += f"SET OPTIONS (description = '{table_description}');"

  column_sql = f"ALTER TABLE `{project_id}.{dataset_name}.{table_name}`\n"
  for item in gemini_dict["columns"]:
    column_description = item['description'].replace("'","\'")
    column_sql += f"ALTER COLUMN {item['name']} SET OPTIONS (description='{column_description}'),\n"

  column_sql = column_sql[:-2] + ";"

  return table_sql, column_sql

### <font color='#4285f4'>Generate the Descriptions</font>

In [None]:
dataset_name = "taxi_dataset"

table_list_sql = f"""SELECT table_name
                       FROM {dataset_name}.INFORMATION_SCHEMA.TABLES
                      WHERE table_type = 'BASE TABLE'
                      ORDER BY table_name;"""

results = RunQuery(table_list_sql)

alter_table_list = []

for index, row in results.iterrows():
  metadata_info = "The table is releated to taxi ride data as part of the New York City public dataset."
  table_name = row['table_name']
  if table_name == 'datastream_cdc_data':
    metadata_info = "The table is related to a change data capture process used by Google Cloud Datastream to seed data for CDC."

  table_sql, column_sql = get_table_description(project_id, dataset_name, table_name, metadata_info)
  alter_table_list.append(table_sql)
  alter_table_list.append(column_sql)

In [None]:
for item in alter_table_list:
  print(item)
  print()
  print()
  # Optional
  # You can run this in BigQuery as a SQL Script or automate running "RunQuery" for each item.
  # RunQuery(item)

Notes:


- If you want to update the schema for external tables you cannot run the ALTER table command you need to update the schema.  See the "BigLake-Demo" notebook.  You would need to get the schema, then add descriptions to the schema (or update them) then call updateTableSchema.  External tables do not support the ALTER TABLE command.
```
def updateTableSchema(project_id, dataset_name, table_name, new_schema):
  import io
  import google.cloud.bigquery as bigquery

  client = bigquery.Client()

  dataset_ref = client.dataset(dataset_name, project=project_id)
  table_ref = dataset_ref.table(table_name)
  table = client.get_table(table_ref)

  table.schema = new_schema
  table = client.update_table(table, ["schema"])

  print(f"Table {table_name} schema updated!")
```

