data-analytics-demos/bigquery-data-governance/colab-enterprise/Template.ipynb (715 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### <font color='#4285f4'>Overview</font>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Overview: REPLACE-ME\n", "\n", "Process Flow:\n", "1. REPLACE-ME\n", "2. REPLACE-ME\n", "\n", "Notes:\n", "* REPLACE-ME\n", "\n", "Cost:\n", "* Approximate cost: REPLACE-ME\n", "\n", "Author: \n", "* REPLACE-ME: Your name or just refer to GitHub" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Architecture Diagram\n", "from IPython.display import Image\n", "Image(url='https://storage.googleapis.com/?.png', width=1200)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### <font color='#4285f4'>Video Walkthrough</font>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[Video](https://storage.googleapis.com/REPLACE-ME.mp4)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.display import HTML\n", "\n", "HTML(\"\"\"\n", "<video width=\"800\" height=\"600\" controls>\n", " <source src=\"https://storage.googleapis.com/REPLACE-ME.mp4\" type=\"video/mp4\">\n", " Your browser does not support the video tag.\n", "</video>\n", "\"\"\")" ] }, { "cell_type": "markdown", "metadata": { "id": "HMsUvoF4BP7Y" }, "source": [ "### <font color='#4285f4'>License</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "jQgQkbOvj55d" }, "source": [ "```\n", "# Copyright 2024 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "```" ] }, { "cell_type": "markdown", "metadata": { "id": "m65vp54BUFRi" }, "source": [ "### <font color='#4285f4'>Pip installs</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5MaWM6H5i6rX" }, "outputs": [], "source": [ "# PIP Installs (if necessary)\n", "import sys\n", "\n", "# !{sys.executable} -m pip install REPLACE-ME" ] }, { "cell_type": "markdown", "metadata": { "id": "UmyL-Rg4Dr_f" }, "source": [ "### <font color='#4285f4'>Initialize</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xOYsEVSXp6IP" }, "outputs": [], "source": [ "from PIL import Image\n", "from IPython.display import HTML\n", "import IPython.display\n", "import google.auth\n", "import requests\n", "import json\n", "import uuid\n", "import base64\n", "import os\n", "import cv2\n", "import random\n", "import time\n", "import datetime\n", "import base64\n", "import random\n", "\n", "import logging\n", "from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wMlHl3bnkFPZ" }, "outputs": [], "source": [ "# Set these (run this cell to verify the output)\n", "\n", "bigquery_location = \"${bigquery_location}\"\n", "region = \"${region}\"\n", "location = \"${location}\"\n", "\n", "# Get the current date and time\n", "now = datetime.datetime.now()\n", "\n", "# Format the date and time as desired\n", "formatted_date = now.strftime(\"%Y-%m-%d-%H-%M\")\n", "\n", "# Get some values using gcloud\n", "project_id = !(gcloud config get-value project)\n", "user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n", "\n", "if len(project_id) != 1:\n", " raise RuntimeError(f\"project_id is not set: {project_id}\")\n", "project_id = project_id[0]\n", "\n", "if len(user) != 1:\n", " raise RuntimeError(f\"user is not set: {user}\")\n", "user = user[0]\n", "\n", "print(f\"project_id = {project_id}\")\n", "print(f\"user = {user}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "sZ6m_wGrK0YG" }, "source": [ "### <font color='#4285f4'>Helper Methods</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "JbOjdSP1kN9T" }, "source": [ "#### restAPIHelper\n", "Calls the Google Cloud REST API using the current users credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "40wlwnY4kM11" }, "outputs": [], "source": [ "def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n", " \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n", "\n", " import requests\n", " import google.auth\n", " import json\n", "\n", " # Get an access token based upon the current user\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request()\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " if http_verb == \"GET\":\n", " response = requests.get(url, headers=headers)\n", " elif http_verb == \"POST\":\n", " response = requests.post(url, json=request_body, headers=headers)\n", " elif http_verb == \"PUT\":\n", " response = requests.put(url, json=request_body, headers=headers)\n", " elif http_verb == \"PATCH\":\n", " response = requests.patch(url, json=request_body, headers=headers)\n", " elif http_verb == \"DELETE\":\n", " response = requests.delete(url, headers=headers)\n", " else:\n", " raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n", "\n", " if response.status_code == 200:\n", " return json.loads(response.content)\n", " #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n", " else:\n", " error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n", " raise RuntimeError(error)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### RetryCondition (for retrying LLM calls)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def RetryCondition(error):\n", " error_string = str(error)\n", " print(error_string)\n", "\n", " retry_errors = [\n", " \"RESOURCE_EXHAUSTED\",\n", " \"No content in candidate\",\n", " # Add more error messages here as needed\n", " ]\n", "\n", " for retry_error in retry_errors:\n", " if retry_error in error_string:\n", " print(\"Retrying...\")\n", " return True\n", "\n", " return False" ] }, { "cell_type": "markdown", "metadata": { "id": "vOFTk6sj1YIV" }, "source": [ "#### Gemini LLM (Pro 1.0 , Pro 1.5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xHit3Hh-1ZAW" }, "outputs": [], "source": [ "@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n", "def GeminiLLM(prompt, model = \"gemini-2.0-flash\", response_schema = None,\n", " temperature = 1, topP = 1, topK = 32):\n", "\n", " # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models\n", "\n", " llm_response = None\n", " if temperature < 0:\n", " temperature = 0\n", "\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request() # required to acess access token\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference\n", " url = f\"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent\"\n", "\n", " generation_config = {\n", " \"temperature\": temperature,\n", " \"topP\": topP,\n", " \"maxOutputTokens\": 8192,\n", " \"candidateCount\": 1,\n", " \"responseMimeType\": \"application/json\",\n", " }\n", "\n", " # Add inthe response schema for when it is provided\n", " if response_schema is not None:\n", " generation_config[\"responseSchema\"] = response_schema\n", "\n", " if model == \"gemini-2.0-flash\":\n", " generation_config[\"topK\"] = topK\n", "\n", " payload = {\n", " \"contents\": {\n", " \"role\": \"user\",\n", " \"parts\": {\n", " \"text\": prompt\n", " },\n", " },\n", " \"generation_config\": {\n", " **generation_config\n", " },\n", " \"safety_settings\": {\n", " \"category\": \"HARM_CATEGORY_SEXUALLY_EXPLICIT\",\n", " \"threshold\": \"BLOCK_LOW_AND_ABOVE\"\n", " }\n", " }\n", "\n", " response = requests.post(url, json=payload, headers=headers)\n", "\n", " if response.status_code == 200:\n", " try:\n", " json_response = json.loads(response.content)\n", " except Exception as error:\n", " raise RuntimeError(f\"An error occurred parsing the JSON: {error}\")\n", "\n", " if \"candidates\" in json_response:\n", " candidates = json_response[\"candidates\"]\n", " if len(candidates) > 0:\n", " candidate = candidates[0]\n", " if \"content\" in candidate:\n", " content = candidate[\"content\"]\n", " if \"parts\" in content:\n", " parts = content[\"parts\"]\n", " if len(parts):\n", " part = parts[0]\n", " if \"text\" in part:\n", " text = part[\"text\"]\n", " llm_response = text\n", " else:\n", " raise RuntimeError(\"No text in part: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No parts in content: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No parts in content: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No content in candidate: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No candidates: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No candidates: {response.content}\")\n", "\n", " # Remove some typically response characters (if asking for a JSON reply)\n", " llm_response = llm_response.replace(\"```json\",\"\")\n", " llm_response = llm_response.replace(\"```\",\"\")\n", " llm_response = llm_response.replace(\"\\n\",\"\")\n", "\n", " return llm_response\n", "\n", " else:\n", " raise RuntimeError(f\"Error with prompt:'{prompt}' Status:'{response.status_code}' Text:'{response.text}'\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rWXSCd5VCPjf" }, "outputs": [], "source": [ "@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n", "def GeminiLLM_VerifyImage(prompt, imageBase64, model = \"gemini-2.0-flash\", response_schema = None,\n", " temperature = 1, topP = 1, topK = 32):\n", "\n", " # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models\n", "\n", " llm_response = None\n", " if temperature < 0:\n", " temperature = 0\n", "\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request() # required to acess access token\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference\n", " url = f\"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent\"\n", "\n", " generation_config = {\n", " \"temperature\": temperature,\n", " \"topP\": topP,\n", " \"maxOutputTokens\": 8192,\n", " \"candidateCount\": 1,\n", " \"responseMimeType\": \"application/json\",\n", " }\n", "\n", " # Add inthe response schema for when it is provided\n", " if response_schema is not None:\n", " generation_config[\"responseSchema\"] = response_schema\n", "\n", " if model == \"gemini-2.0-flash\":\n", " generation_config[\"topK\"] = topK\n", "\n", " payload = {\n", " \"contents\": {\n", " \"role\": \"user\",\n", " \"parts\": [\n", " { \"text\": prompt },\n", " { \"inlineData\": { \"mimeType\": \"image/png\", \"data\": f\"{imageBase64}\" } }\n", " ]\n", " },\n", " \"generation_config\": {\n", " **generation_config\n", " },\n", " \"safety_settings\": {\n", " \"category\": \"HARM_CATEGORY_SEXUALLY_EXPLICIT\",\n", " \"threshold\": \"BLOCK_LOW_AND_ABOVE\"\n", " }\n", " }\n", "\n", " response = requests.post(url, json=payload, headers=headers)\n", "\n", " if response.status_code == 200:\n", " try:\n", " json_response = json.loads(response.content)\n", " except Exception as error:\n", " raise RuntimeError(f\"An error occurred parsing the JSON: {error}\")\n", "\n", " if \"candidates\" in json_response:\n", " candidates = json_response[\"candidates\"]\n", " if len(candidates) > 0:\n", " candidate = candidates[0]\n", " if \"content\" in candidate:\n", " content = candidate[\"content\"]\n", " if \"parts\" in content:\n", " parts = content[\"parts\"]\n", " if len(parts):\n", " part = parts[0]\n", " if \"text\" in part:\n", " text = part[\"text\"]\n", " llm_response = text\n", " else:\n", " raise RuntimeError(\"No text in part: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No parts in content: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No parts in content: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No content in candidate: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No candidates: {response.content}\")\n", " else:\n", " raise RuntimeError(\"No candidates: {response.content}\")\n", "\n", " # Remove some typically response characters (if asking for a JSON reply)\n", " llm_response = llm_response.replace(\"```json\",\"\")\n", " llm_response = llm_response.replace(\"```\",\"\")\n", " llm_response = llm_response.replace(\"\\n\",\"\")\n", "\n", " return llm_response\n", "\n", " else:\n", " raise RuntimeError(f\"Error with prompt:'{prompt}' Status:'{response.status_code}' Text:'{response.text}'\")" ] }, { "cell_type": "markdown", "metadata": { "id": "bI-KJELZ1jgt" }, "source": [ "#### Helper Functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pmnCwYvA1kZv" }, "outputs": [], "source": [ "def RunQuery(sql):\n", " import time\n", " from google.cloud import bigquery\n", " client = bigquery.Client()\n", "\n", " if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n", " df_result = client.query(sql).to_dataframe()\n", " return df_result\n", " else:\n", " job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n", " query_job = client.query(sql, job_config=job_config)\n", "\n", " # Check on the progress by getting the job's updated state.\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " while query_job.state != \"DONE\":\n", " time.sleep(2)\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " if query_job.error_result == None:\n", " return True\n", " else:\n", " raise Exception(query_job.error_result)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "VNAmwvAf1knl" }, "outputs": [], "source": [ "def PrettyPrintJson(json_string):\n", " json_object = json.loads(json_string)\n", " json_formatted_str = json.dumps(json_object, indent=2)\n", " return json_formatted_str" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "IBH7sIg3XlGv" }, "outputs": [], "source": [ "def GetNextPrimaryKey(fully_qualified_table_name, field_name):\n", " from google.cloud import bigquery\n", " client = bigquery.Client()\n", " sql = f\"\"\"\n", " SELECT IFNULL(MAX({field_name}),0) AS result\n", " FROM `{fully_qualified_table_name}`\n", " \"\"\"\n", " # print(sql)\n", " df_result = client.query(sql).to_dataframe()\n", " # display(df_result)\n", " return df_result['result'].iloc[0] + 1" ] }, { "cell_type": "markdown", "metadata": { "id": "EYRHDPdVKBzd" }, "source": [ "### <font color='#4285f4'>Create BigQuery Tables</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PJsTtUbD5SsQ" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "# REPLACE-ME" ] }, { "cell_type": "markdown", "metadata": { "id": "c51M89g0Ejmz" }, "source": [ "### <font color='#4285f4'>MAIN CODE - REPLACE-ME</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "U4A_7n4SEPNO" }, "outputs": [], "source": [ "# REPLACE-ME" ] }, { "cell_type": "markdown", "metadata": { "id": "42IxhtRRrvR-" }, "source": [ "### <font color='#4285f4'>Clean Up</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6lF2Z7skFbvf" }, "outputs": [], "source": [ "# Placeholder" ] }, { "cell_type": "markdown", "metadata": { "id": "ASQ2BPisXDA0" }, "source": [ "### <font color='#4285f4'>Reference Links</font>\n" ] }, { "cell_type": "markdown", "metadata": { "id": "rTY6xJdZ3ul8" }, "source": [ "- [REPLACE-ME](https://REPLACE-ME)" ] } ], "metadata": { "colab": { "collapsed_sections": [ "HMsUvoF4BP7Y", "m65vp54BUFRi", "UmyL-Rg4Dr_f", "JbOjdSP1kN9T", "EYRHDPdVKBzd", "HNZanAdJEaPq", "oMl8zBaOEfy8", "xPM7R0UdU7Q4", "c51M89g0Ejmz", "SLK7IX7QruT3", "2olP2ZnglJlr", "ASQ2BPisXDA0" ], "name": "Campaign-Assets-Hyper-Personalized-Email", "private_outputs": true, "provenance": [], "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }