colab-enterprise/Campaign-Performance-Data-Insights.ipynb (1,198 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### <font color='#4285f4'>Overview</font>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This process leverages Google Cloud's Data Insights to automatically generate SQL queries for analyzing data within a view. It uses Gemini, Google's AI model, to examine the table metadata and produce insightful queries. These queries help uncover hidden patterns, assess data quality, and perform statistical analysis, providing valuable data exploration within BigQuery. The generated insights are then stored in a tagged BigQuery table for easy access and visualization within the BigQuery UI.\n", "\n", "Process Flow:\n", "\n", "1. Create a view\n", " * a. This makes insights easier versus relying on text-to-sql to discover all the proper joins.\n", "2. Create a data profile scan to understand the statistics of the data\n", "3. Run the data profile scan\n", "4. Create a data insights scan to create SQL statements to help you explore your data\n", "5. Run the data insights scan\n", "6. Link the data profile scan to the BigQuery user interface\n", " * a. Tag the BigQuery table with the following so the Insights show in the BigQuery UI:\n", " * dataplex-dp-published-location\n", " * dataplex-dp-published-project\n", " * dataplex-dp-published-scan\n", "7. Link the data insights scan to the BigQuery user interface\n", " * a. Tag the BigQuery table with the following so the Insights show in the BigQuery UI:\n", " * dataplex-data-documentation-published-location\n", " * dataplex-data-documentation-published-project\n", " * dataplex-data-documentation-published-scan\n", "8. Use the REST API to get the SQL (Data Insights SQL)\n", "9. Save the results to BigQuery using a bulk insert\n", "\n", "Cost:\n", "* Low: Gemini, BigQuery\n", "* Medium: Remember to stop your Colab Enterprise Notebook Runtime\n", "\n", "Author: \n", "* Adam Paternostro" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Architecture Diagram\n", "from IPython.display import Image\n", "Image(url='https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Artifacts/Campaign-Performance-Data-Insights-Architecture.png', width=1200)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### <font color='#4285f4'>Video Walkthrough</font>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "[![Video](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/adam-paternostro-video.png)](https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Performance-Data-Insights.mp4)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.display import HTML\n", "\n", "HTML(\"\"\"\n", "<video width=\"800\" height=\"600\" controls>\n", " <source src=\"https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Videos/Campaign-Performance-Data-Insights.mp4\" type=\"video/mp4\">\n", " Your browser does not support the video tag.\n", "</video>\n", "\"\"\")" ] }, { "cell_type": "markdown", "metadata": { "id": "HMsUvoF4BP7Y" }, "source": [ "### <font color='#4285f4'>License</font>\n", "\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "jQgQkbOvj55d" }, "source": [ "```\n", "# Copyright 2024 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "```" ] }, { "cell_type": "markdown", "metadata": { "id": "m65vp54BUFRi" }, "source": [ "### <font color='#4285f4'>Pip installs</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5MaWM6H5i6rX" }, "outputs": [], "source": [ "# PIP Installs\n", "import sys\n", "\n", "# https://PLACEHOLDER.com/index.html\n", "# !{sys.executable} -m pip install PLACEHOLDER" ] }, { "cell_type": "markdown", "metadata": { "id": "UmyL-Rg4Dr_f" }, "source": [ "### <font color='#4285f4'>Initialize</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xOYsEVSXp6IP" }, "outputs": [], "source": [ "from PIL import Image\n", "from IPython.display import HTML\n", "from IPython.display import Audio\n", "from functools import reduce\n", "import IPython.display\n", "import google.auth\n", "import requests\n", "import json\n", "import uuid\n", "import base64\n", "import os\n", "import cv2\n", "import random\n", "import time\n", "import datetime\n", "import base64\n", "import random\n", "\n", "import logging\n", "from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wMlHl3bnkFPZ" }, "outputs": [], "source": [ "# Set these (run this cell to verify the output)\n", "\n", "bigquery_location = \"${bigquery_location}\"\n", "region = \"${region}\"\n", "\n", "# Get some values using gcloud\n", "project_id = !(gcloud config get-value project)\n", "user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n", "\n", "if len(project_id) != 1:\n", " raise RuntimeError(f\"project_id is not set: {project_id}\")\n", "project_id = project_id[0]\n", "\n", "if len(user) != 1:\n", " raise RuntimeError(f\"user is not set: {user}\")\n", "user = user[0]\n", "\n", "print(f\"project_id = {project_id}\")\n", "print(f\"user = {user}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "sZ6m_wGrK0YG" }, "source": [ "### <font color='#4285f4'>Helper Methods</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "JbOjdSP1kN9T" }, "source": [ "#### restAPIHelper\n", "Calls the Google Cloud REST API using the current users credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "40wlwnY4kM11" }, "outputs": [], "source": [ "def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n", " \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n", "\n", " import requests\n", " import google.auth\n", " import json\n", "\n", " # Get an access token based upon the current user\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request()\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " if http_verb == \"GET\":\n", " response = requests.get(url, headers=headers)\n", " elif http_verb == \"POST\":\n", " response = requests.post(url, json=request_body, headers=headers)\n", " elif http_verb == \"PUT\":\n", " response = requests.put(url, json=request_body, headers=headers)\n", " elif http_verb == \"PATCH\":\n", " response = requests.patch(url, json=request_body, headers=headers)\n", " elif http_verb == \"DELETE\":\n", " response = requests.delete(url, headers=headers)\n", " else:\n", " raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n", "\n", " if response.status_code == 200:\n", " return json.loads(response.content)\n", " #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n", " else:\n", " error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n", " raise RuntimeError(error)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### RetryCondition (for retrying LLM calls)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def RetryCondition(error):\n", " error_string = str(error)\n", " print(error_string)\n", "\n", " retry_errors = [\n", " \"RESOURCE_EXHAUSTED\",\n", " \"No content in candidate\",\n", " # Add more error messages here as needed\n", " ]\n", "\n", " for retry_error in retry_errors:\n", " if retry_error in error_string:\n", " print(\"Retrying...\")\n", " return True\n", "\n", " return False" ] }, { "cell_type": "markdown", "metadata": { "id": "4ndOu9GkyoBJ" }, "source": [ "#### Helper Functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "DdPlK0kzeCuI" }, "outputs": [], "source": [ "def RunQuery(sql):\n", " import time\n", " from google.cloud import bigquery\n", " client = bigquery.Client()\n", "\n", " if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n", " df_result = client.query(sql).to_dataframe()\n", " return df_result\n", " else:\n", " job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n", " query_job = client.query(sql, job_config=job_config)\n", "\n", " # Check on the progress by getting the job's updated state.\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " while query_job.state != \"DONE\":\n", " time.sleep(2)\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " if query_job.error_result == None:\n", " return True\n", " else:\n", " raise Exception(query_job.error_result)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "fRgDAjN9gsg9" }, "outputs": [], "source": [ "# Since our Primary keys are INTs we get the next available value\n", "def GetNextPrimaryKey(fully_qualified_table_name, field_name):\n", " import time\n", " from google.cloud import bigquery\n", " client = bigquery.Client()\n", "\n", " sql = f\"\"\"\n", " SELECT IFNULL(MAX({field_name}),0) AS result\n", " FROM `{fully_qualified_table_name}`\n", " \"\"\"\n", " # print(sql)\n", " df_result = client.query(sql).to_dataframe()\n", " # display(df_result)\n", " return df_result['result'].iloc[0] + 1" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "q-WI70Wqyssc" }, "outputs": [], "source": [ "def PrettyPrintJson(json_string):\n", " json_object = json.loads(json_string)\n", " json_formatted_str = json.dumps(json_object, indent=2)\n", " print(json_formatted_str)\n", " return json.dumps(json_object)" ] }, { "cell_type": "markdown", "metadata": { "id": "nKJr7o-cg3FE" }, "source": [ "### <font color='#4285f4'>Create BigQuery table to hold results</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "-5t-WixKg2Or" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "#DROP TABLE IF EXISTS `chocolate_ai.data_insights`;\n", "\n", "CREATE TABLE IF NOT EXISTS `chocolate_ai.data_insights`\n", "(\n", " data_insights_id INTEGER NOT NULL OPTIONS(description=\"Primary key.\"),\n", " data_insights_scan_name STRING NOT NULL OPTIONS(description=\"The name of the data insights scan.\"),\n", " data_insights_dataset_name STRING NOT NULL OPTIONS(description=\"The name of the data insights dataset.\"),\n", " data_insights_table_name STRING NOT NULL OPTIONS(description=\"The name of the data insights table.\"),\n", "\n", " data_insights_sql STRING NOT NULL OPTIONS(description=\"The generated SQL by data insights\"),\n", " data_insights_sql_description STRING NOT NULL OPTIONS(description=\"The generated Description by data insights\"),\n", ")\n", "CLUSTER BY data_insights_scan_name;" ] }, { "cell_type": "markdown", "metadata": { "id": "yyq-NQ2gtxQs" }, "source": [ "### <font color='#4285f4'>Create a run a Data Profile Scan</font>\n", "- You should create a scan before you create insights" ] }, { "cell_type": "markdown", "metadata": { "id": "hwAEuMl1VZR5" }, "source": [ "#### createDataProfileScan" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "zrhbXGtJwqZL" }, "outputs": [], "source": [ "def createDataProfileScan(data_profile_scan_name, data_profile_display_name, data_profile_dataset_name, data_profile_data_scan_table_name):\n", " \"\"\"Creates the data profile scan.\"\"\"\n", "\n", " # Gather existing data scans\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/list\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{region}/dataScans\"\n", "\n", " # Gather existing data scans\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createDataDocumentScan (GET) json_result: {json_result}\")\n", "\n", " # Test to see if data scan exists, if so return\n", " if \"dataScans\" in json_result:\n", " for item in json_result[\"dataScans\"]:\n", " print(f\"Scan names: {item['name']}\")\n", " if item[\"name\"] == f\"projects/{project_id}/locations/{region}/dataScans/{data_profile_scan_name}\":\n", " print(f\"Data Document Scan {data_profile_scan_name} already exists\")\n", " return f\"projects/{project_id}/locations/{region}/dataScans/{data_profile_scan_name}\"\n", "\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create\n", " print(\"Creating Data Profile Scan\")\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{region}/dataScans?dataScanId={data_profile_scan_name}\"\n", "\n", " request_body = {\n", " \"dataProfileSpec\": { \"samplingPercent\": 25 },\n", " \"data\": { \"resource\": f\"//bigquery.googleapis.com/projects/{project_id}/datasets/{data_profile_dataset_name}/tables/{data_profile_data_scan_table_name}\" },\n", " \"description\": data_profile_display_name,\n", " \"displayName\": data_profile_display_name\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " name = json_result[\"metadata\"][\"target\"]\n", " print(f\"Data Profile Scan created: {name}\")\n", "\n", " return name" ] }, { "cell_type": "markdown", "metadata": { "id": "sgehzGJMs5J6" }, "source": [ "#### runDataProfileScan" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "LiaJx9kAs5vi" }, "outputs": [], "source": [ "def runDataProfileScan(data_profile_scan_name):\n", " \"\"\"Runs the data profile scan job and monitors until it completes\"\"\"\n", "\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/run\n", " print(\"Running Data Profile Scan\")\n", " time.sleep(10)\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{region}/dataScans/{data_profile_scan_name}:run\"\n", "\n", " request_body = { }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " job_name = json_result[\"job\"][\"name\"]\n", " job_state = json_result[\"job\"][\"state\"]\n", " print(f\"Document Data Scan Run created: {job_name} - State: {job_state}\")\n", "\n", " # Monitor the job until it completes\n", " url = f\"https://dataplex.googleapis.com/v1/{job_name}\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " while json_result[\"state\"] == \"STATE_UNSPECIFIED\" or json_result[\"state\"] == \"RUNNING\" or json_result[\"state\"] == \"PENDING\":\n", " print(f\"Document Data Scan Run {job_name} - State: {json_result['state']}\")\n", " time.sleep(10)\n", " json_result = restAPIHelper(url, \"GET\", None)\n" ] }, { "cell_type": "markdown", "metadata": { "id": "c6ZTj8Bev2nP" }, "source": [ "#### Create the data profile scan and the scan job" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ZgEGQdhUXgBO" }, "outputs": [], "source": [ "data_profile_scan_name = \"chocolate-insights-data-profile-scan\"\n", "data_profile_display_name = \"chocolate-insights-data-profile-scan\"\n", "data_profile_dataset_name = \"${bigquery_chocolate_ai_dataset}\"\n", "data_profile_data_scan_table_name = \"chocolate_insights\"\n", "\n", "data_profile_uri = createDataProfileScan(data_profile_scan_name, data_profile_display_name, data_profile_dataset_name, data_profile_data_scan_table_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "KrT2YDULtkVk" }, "outputs": [], "source": [ "time.sleep(10) # give it 10 seconds to be created\n", "\n", "runDataProfileScan(data_profile_scan_name)" ] }, { "cell_type": "markdown", "metadata": { "id": "OuSRYmz0t1u2" }, "source": [ "### <font color='#4285f4'>Create a run a Data Insights Scan</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "no0x5CHub1IU" }, "source": [ "#### createDataDocumentScan" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "IivdMckQS4i4" }, "outputs": [], "source": [ "def createDataDocumentScan(data_insights_scan_name, data_insights_display_name, data_insights_dataset_name, data_insights_data_scan_table_name):\n", " \"\"\"Tests to see if the Document (not a data profile or data quality) scan is created and if not, creates it.\"\"\"\n", "\n", " \"\"\"\n", " !curl \\\n", " 'https://dataplex.googleapis.com/v1/projects/${project_id}/locations/us-central1/dataScans' \\\n", " --header \"Authorization: Bearer $(gcloud auth print-access-token)\" \\\n", " --header 'Accept: application/json' \\\n", " --compressed\n", " {\n", " \"name\": \"projects/${project_id}/locations/us-central1/dataScans/chocolate-ai-looker-sales-data-docuemnt-scan\",\n", " \"uid\": \"4654c390-a841-4939-bd0b-8ba2d151bd03\",\n", " \"displayName\": \"Chocolate A.I. - Looker Sales Data Documenation Scan\",\n", " \"state\": \"ACTIVE\",\n", " \"createTime\": \"2024-08-22T18:07:50.455838572Z\",\n", " \"updateTime\": \"2024-08-22T18:07:55.294013073Z\",\n", " \"data\": {\n", " \"resource\": \"//bigquery.googleapis.com/projects/${project_id}/datasets/chocolate_ai/tables/looker_sales_data\"\n", " },\n", " \"executionSpec\": {\n", " \"trigger\": {\n", " \"onDemand\": {}\n", " }\n", " },\n", " \"executionStatus\": {},\n", " \"type\": \"DATA_DOCUMENTATION\"\n", " },\n", " \"\"\"\n", "\n", " # Gather existing data scans\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/list\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{region}/dataScans\"\n", "\n", " # Gather existing data scans\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createDataDocumentScan (GET) json_result: {json_result}\")\n", "\n", " # Test to see if data scan exists, if so return\n", " if \"dataScans\" in json_result:\n", " for item in json_result[\"dataScans\"]:\n", " print(f\"Scan names: {item['name']}\")\n", " # \"projects/${project_id}/locations/us-central1/clusters/kafka-cluster\"\n", " if item[\"name\"] == f\"projects/{project_id}/locations/{region}/dataScans/{data_insights_scan_name}\":\n", " print(f\"Data Document Scan {data_insights_scan_name} already exists\")\n", " return f\"projects/{project_id}/locations/{region}/dataScans/{data_insights_scan_name}\"\n", "\n", " \"\"\"\n", " # Create a Documentation Scan\n", " # Create\n", " #\n", " !curl --request POST \\\n", " 'https://dataplex.googleapis.com/v1/projects/${project_id}/locations/us-central1/dataScans?dataScanId=chocolate-ai-looker-sales-data-docuemnt-scan' \\\n", " --header \"Authorization: Bearer $(gcloud auth print-access-token)\" \\\n", " --header 'Accept: application/json' \\\n", " --header 'Content-Type: application/json' \\\n", " --data '{\"displayName\":\"Chocolate A.I. - Looker Sales Data Documenation Scan\",\"type\": \"DATA_DOCUMENTATION\", \"dataDocumentationSpec\": {}, \"data\":{\"resource\":\"//bigquery.googleapis.com/projects/${project_id}/datasets/chocolate_ai/tables/looker_sales_data\"}}' \\\n", " --compressed\n", " \"\"\"\n", "\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create\n", " print(\"Creating Document Data Scan\")\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{region}/dataScans?dataScanId={data_insights_scan_name}\"\n", "\n", " request_body = {\n", " \"displayName\": data_insights_display_name,\n", " \"type\": \"DATA_DOCUMENTATION\",\n", " \"dataDocumentationSpec\": {},\n", " \"data\":{\n", " \"resource\": f\"//bigquery.googleapis.com/projects/{project_id}/datasets/{data_insights_dataset_name}/tables/{data_insights_data_scan_table_name}\"\n", " }\n", " }\n", "\n", " \"\"\"\n", " {\n", " \"name\": \"projects/${project_id}/locations/us-central1/operations/operation-1724350067868-62049894382d1-169e9f87-1fe5b7b6\",\n", " \"metadata\": {\n", " \"@type\": \"type.googleapis.com/google.cloud.dataplex.v1.OperationMetadata\",\n", " \"createTime\": \"2024-08-22T18:07:50.460547819Z\",\n", " \"target\": \"projects/${project_id}/locations/us-central1/dataScans/chocolate-ai-looker-sales-data-docuemnt-scan\",\n", " \"verb\": \"create\",\n", " \"requestedCancellation\": false,\n", " \"apiVersion\": \"v1\"\n", " },\n", " \"done\": false\n", " }\n", " \"\"\"\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " name = json_result[\"metadata\"][\"target\"]\n", " print(f\"Document Data Scan created: {name}\")\n", " return name" ] }, { "cell_type": "markdown", "metadata": { "id": "MVqzTz8qb7PB" }, "source": [ "#### runDataDocumentScan" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iZDnRyphYdVL" }, "outputs": [], "source": [ "def runDataDocumentScan(data_insights_scan_name):\n", " \"\"\"Runs the data document scan job and monitors until it completes\"\"\"\n", "\n", " \"\"\"\n", " !curl --request POST \\\n", " 'https://dataplex.googleapis.com/v1/projects/${project_id}/locations/us-central1/dataScans/chocolate-ai-looker-sales-data-docuemnt-scan:run' \\\n", " --header \"Authorization: Bearer $(gcloud auth print-access-token)\" \\\n", " --header 'Accept: application/json' \\\n", " --header 'Content-Type: application/json' \\\n", " --data '{}' \\\n", " --compressed\n", " \"\"\"\n", "\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/run\n", " print(\"Creating Document Data Scan Run\")\n", " time.sleep(10)\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{region}/dataScans/{data_insights_scan_name}:run\"\n", "\n", " request_body = { }\n", "\n", " \"\"\"\n", " {\n", " \"job\": {\n", " \"name\": \"projects/756740881369/locations/us-central1/dataScans/chocolate-ai-looker-sales-data-docuemnt-scan/jobs/27115210-eaf8-43f2-80e8-b2681daa07f0\",\n", " \"uid\": \"27115210-eaf8-43f2-80e8-b2681daa07f0\",\n", " \"state\": \"PENDING\",\n", " \"type\": \"DATA_DOCUMENTATION\",\n", " \"createTime\": \"1970-01-01T00:00:00Z\",\n", " \"dataDocumentationSpec\": {}\n", " }\n", " }\n", " \"\"\"\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " job_name = json_result[\"job\"][\"name\"]\n", " job_state = json_result[\"job\"][\"state\"]\n", " print(f\"Document Data Scan Run created: {job_name} - State: {job_state}\")\n", "\n", " # Monitor the job until it completes\n", " \"\"\"\n", " !curl \\\n", " 'https://dataplex.googleapis.com/v1/projects/756740881369/locations/us-central1/dataScans/chocolate-ai-looker-sales-data-docuemnt-scan/jobs/27115210-eaf8-43f2-80e8-b2681daa07f0' \\\n", " --header \"Authorization: Bearer $(gcloud auth print-access-token)\" \\\n", " --header 'Accept: application/json' \\\n", " --compressed\n", "\n", " {\n", " \"name\": \"projects/756740881369/locations/us-central1/dataScans/chocolate-ai-looker-sales-data-docuemnt-scan/jobs/27115210-eaf8-43f2-80e8-b2681daa07f0\",\n", " \"uid\": \"27115210-eaf8-43f2-80e8-b2681daa07f0\",\n", " \"startTime\": \"2024-08-22T18:12:17.553066314Z\",\n", " \"endTime\": \"2024-08-22T18:13:06.541798260Z\",\n", " \"state\": \"SUCCEEDED\",\n", " \"type\": \"DATA_DOCUMENTATION\",\n", " \"createTime\": \"2024-08-22T18:12:17.553025391Z\"\n", " }\n", " STATE_UNSPECIFIED\tThe DataScanJob state is unspecified.\n", " RUNNING\tThe DataScanJob is running.\n", " CANCELING\tThe DataScanJob is canceling.\n", " CANCELLED\tThe DataScanJob cancellation was successful.\n", " SUCCEEDED\tThe DataScanJob completed successfully.\n", " FAILED\tThe DataScanJob is no longer running due to an error.\n", " PENDING\tThe DataScanJob has been created but not started to run yet.\n", " \"\"\"\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/{job_name}\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " while json_result[\"state\"] == \"STATE_UNSPECIFIED\" or json_result[\"state\"] == \"RUNNING\" or json_result[\"state\"] == \"PENDING\":\n", " print(f\"Document Data Scan Run {job_name} - State: {json_result['state']}\")\n", " time.sleep(10)\n", " json_result = restAPIHelper(url, \"GET\", None)\n" ] }, { "cell_type": "markdown", "metadata": { "id": "DC6-LnNob90k" }, "source": [ "#### getDataDocumentScanSql" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "EW6fmTRdW4km" }, "outputs": [], "source": [ "def getDataDocumentScanSql(data_insights_scan_name, data_insights_dataset_name,data_insights_data_scan_table_name, data_insight_id_starting_key):\n", " \"\"\"Gets the results of the document scan. If there are no results, then run the job to create the results.\"\"\"\n", "\n", " \"\"\"\n", " !curl \\\n", " 'https://dataplex.googleapis.com/v1/projects/${project_id}/locations/us-central1/dataScans/chocolate-ai-looker-sales-data-docuemnt-scan?view=FULL' \\\n", " --header \"Authorization: Bearer $(gcloud auth print-access-token)\" \\\n", " --header 'Accept: application/json' \\\n", " --compressed\n", "\n", " {\n", " \"name\": \"projects/${project_id}/locations/us-central1/dataScans/chocolate-ai-looker-sales-data-docuemnt-scan\",\n", " \"uid\": \"4654c390-a841-4939-bd0b-8ba2d151bd03\",\n", " \"displayName\": \"Chocolate A.I. - Looker Sales Data Documenation Scan\",\n", " \"state\": \"ACTIVE\",\n", " \"createTime\": \"2024-08-22T18:07:50.455838572Z\",\n", " \"updateTime\": \"2024-08-22T18:07:55.294013073Z\",\n", " \"data\": {\n", " \"resource\": \"//bigquery.googleapis.com/projects/${project_id}/datasets/chocolate_ai/tables/looker_sales_data\"\n", " },\n", " \"executionSpec\": {\n", " \"trigger\": {\n", " \"onDemand\": {}\n", " }\n", " },\n", " \"executionStatus\": {\n", " \"latestJobStartTime\": \"2024-08-22T18:12:17.553066314Z\",\n", " \"latestJobEndTime\": \"2024-08-22T18:13:06.541798260Z\",\n", " \"latestJobCreateTime\": \"2024-08-22T18:12:17.553025391Z\"\n", " },\n", " \"type\": \"DATA_DOCUMENTATION\",\n", " \"dataDocumentationSpec\": {},\n", " \"dataDocumentationResult\": {\n", " \"queries\": [\n", " {\n", " \"sql\": \"SELECT company_name, item_name, city_name, item_size, customer_name, sale_date, SUM(sale_price) AS total_sale_price FROM `chocolate_ai.looker_sales_data` GROUP BY company_name, item_name, city_name, item_size, customer_name, sale_date;\",\n", " \"description\": \"What is the total sale price of each item name for each company in each city for each item size for each customer name for each sale date?\"\n", " }\n", " ]\n", " }\n", " }\n", " \"\"\"\n", "\n", " # First find the cluster if it scan exists\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/list\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{region}/dataScans/{data_insights_scan_name}?view=FULL\"\n", "\n", " # Gather existing clusters\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createDataDocumentScan (GET) json_result: {json_result}\")\n", "\n", " # Test to see if cluster exists, if so return\n", " data_insights_list = []\n", " if \"name\" in json_result:\n", " if \"dataDocumentationResult\" in json_result:\n", " for item in json_result[\"dataDocumentationResult\"][\"queries\"]:\n", " result_dict = {\n", " \"data_insights_id\" : data_insight_id_starting_key,\n", " \"data_insights_scan_name\" : data_insights_scan_name,\n", " \"data_insights_dataset_name\" : data_insights_dataset_name,\n", " \"data_insights_table_name\" : data_insights_table_name,\n", " \"data_insights_sql\": item[\"sql\"],\n", " \"data_insights_sql_description\": item[\"description\"]\n", " }\n", " data_insight_id_starting_key += 1\n", " data_insights_list.append(result_dict)\n", " else:\n", " print(\"Need to run a job\")\n", " runDataDocumentScan(data_insights_scan_name)\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " if \"name\" in json_result:\n", " if \"dataDocumentationResult\" in json_result:\n", " for item in json_result[\"dataDocumentationResult\"][\"queries\"]:\n", " result_dict = {\n", " \"data_insights_id\" : data_insight_id_starting_key,\n", " \"data_insights_scan_name\" : data_insights_scan_name,\n", " \"data_insights_dataset_name\" : data_insights_dataset_name,\n", " \"data_insights_table_name\" : data_insights_table_name,\n", " \"data_insights_sql\": item[\"sql\"],\n", " \"data_insights_sql_description\": item[\"description\"]\n", " }\n", " data_insight_id_starting_key += 1\n", " data_insights_list.append(result_dict)\n", "\n", " return data_insights_list" ] }, { "cell_type": "markdown", "metadata": { "id": "8Luj-kVQcBBU" }, "source": [ "#### Create the document scan, the scan job and return the data insights results" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ttzYqec9WL46" }, "outputs": [], "source": [ "data_insights_scan_name = \"chocolate-insights-data-documentation-scan\"\n", "data_insights_display_name = \"chocolate-insights-data-documentation-scan\" # This triggers the BigQuery UI to show the results\n", "data_insights_dataset_name = \"${bigquery_chocolate_ai_dataset}\"\n", "data_insights_data_scan_table_name = \"chocolate_insights\"\n", "\n", "data_document_uri = createDataDocumentScan(data_insights_scan_name, data_insights_display_name, data_insights_dataset_name, data_insights_data_scan_table_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "txLztYEeXxQj" }, "outputs": [], "source": [ "data_insights_table_name = \"data_insights\"\n", "data_insights_table_primary_key = \"data_insights_id\"\n", "\n", "data_insight_id_starting_key = GetNextPrimaryKey(f\"{project_id}.{data_insights_dataset_name}.{data_insights_table_name}\", data_insights_table_primary_key)\n", "\n", "data_insights_list = getDataDocumentScanSql(data_insights_scan_name, data_insights_dataset_name, data_insights_table_name, data_insight_id_starting_key)" ] }, { "cell_type": "markdown", "metadata": { "id": "Hefp2KmtuGAz" }, "source": [ "### <font color='#4285f4'>Attached the Scan/Insights to BigQuery User Interface</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "u5XAEpZyulR0" }, "source": [ "##### updateBigQueryTableDataProfileLabels" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "399uNWYCuqDa" }, "outputs": [], "source": [ "def updateBigQueryTableDataProfileLabels(data_profile_scan_id, data_profile_dataset_name, data_profile_data_scan_table_name):\n", " \"\"\"Sets the labels on the BigQuery table so users can see the data profile in the Console.\"\"\"\n", "\n", " # Patch BigQuery\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create\n", " print(\"Patching BigQuery Data Profile Labels\")\n", "\n", " url = f\"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{data_profile_dataset_name}/tables/{data_profile_data_scan_table_name}\"\n", "\n", " request_body = {\n", " \"labels\" : {\n", " \"dataplex-dp-published-location\" : region,\n", " \"dataplex-dp-published-project\" : project_id,\n", " \"dataplex-dp-published-scan\" : data_profile_scan_id,\n", " }\n", " }\n", "\n", " json_result = restAPIHelper(url, \"PATCH\", request_body)\n", " print(json_result)" ] }, { "cell_type": "markdown", "metadata": { "id": "D4goQllt3a5L" }, "source": [ "##### updateBigQueryTableInsightsLabels" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "qw3LXTC-0HOx" }, "outputs": [], "source": [ "def updateBigQueryTableInsightsLabels(data_insights_scan_id, data_insights_dataset_name, data_insights_data_scan_table_name):\n", " \"\"\"Sets the labels on the BigQuery table so users can see the data insights in the Console.\"\"\"\n", "\n", "\n", " # Patch BigQuery\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create\n", " print(\"Patching BigQuery Data Insights Labels\")\n", "\n", " url = f\"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{data_insights_dataset_name}/tables/{data_insights_data_scan_table_name}\"\n", "\n", " request_body = {\n", " \"labels\" : {\n", " \"dataplex-data-documentation-published-location\" : region,\n", " \"dataplex-data-documentation-published-project\" : project_id,\n", " \"dataplex-data-documentation-published-scan\" : data_insights_scan_id,\n", " }\n", " }\n", "\n", " json_result = restAPIHelper(url, \"PATCH\", request_body)\n", " print(json_result)" ] }, { "cell_type": "markdown", "metadata": { "id": "5DOZQ0j-3fnT" }, "source": [ "##### Patch the BigQuery table so the Data Insights show in the UI" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Vk6hS8u4ypk4" }, "outputs": [], "source": [ "# This will tell the BigQuery UI about this data profile and document scan.\n", "# You can then see the scans in the BigQuery user interface.\n", "\n", "updateBigQueryTableDataProfileLabels(data_profile_scan_name, data_profile_dataset_name, data_profile_data_scan_table_name)\n", "\n", "updateBigQueryTableInsightsLabels(data_insights_scan_name, data_insights_dataset_name, data_insights_data_scan_table_name)\n", "\n", "print(\"You can now see the scans by clicking on each BigQuery table.\")" ] }, { "cell_type": "markdown", "metadata": { "id": "7jjlHPX_eGWP" }, "source": [ "### <font color='#4285f4'>Save the Data Insights to a BigQuery Table</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "x3kgxRsPeWbR" }, "outputs": [], "source": [ "import pandas as pd\n", "from google.cloud import bigquery\n", "\n", "bigquery_client = bigquery.Client()\n", "\n", "# Bulk insert the results\n", "table_id = f\"{project_id}.chocolate_ai.data_insights\"\n", "\n", "dataframe = pd.DataFrame(\n", " pd.DataFrame(data_insights_list), # Your source data\n", " columns=[\n", " \"data_insights_id\",\n", " \"data_insights_scan_name\",\n", " \"data_insights_dataset_name\",\n", " \"data_insights_table_name\",\n", " \"data_insights_sql\",\n", " \"data_insights_sql_description\",\n", " ],\n", ")\n", "\n", "job_config = bigquery.LoadJobConfig(\n", " schema=[\n", " bigquery.SchemaField(\"data_insights_id\", bigquery.enums.SqlTypeNames.INT64, mode=\"REQUIRED\"),\n", " bigquery.SchemaField(\"data_insights_scan_name\", bigquery.enums.SqlTypeNames.STRING, mode=\"REQUIRED\"),\n", " bigquery.SchemaField(\"data_insights_dataset_name\", bigquery.enums.SqlTypeNames.STRING, mode=\"REQUIRED\"),\n", " bigquery.SchemaField(\"data_insights_table_name\", bigquery.enums.SqlTypeNames.STRING, mode=\"REQUIRED\"),\n", " bigquery.SchemaField(\"data_insights_sql\", bigquery.enums.SqlTypeNames.STRING, mode=\"REQUIRED\"),\n", " bigquery.SchemaField(\"data_insights_sql_description\", bigquery.enums.SqlTypeNames.STRING, mode=\"REQUIRED\"),\n", " ],\n", " write_disposition=\"WRITE_APPEND\",\n", ")\n", "\n", "job = bigquery_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)\n", "job.result() # Wait for the job to complete.\n", "\n", "table = bigquery_client.get_table(table_id) # Make an API request.\n", "print(\"Loaded {} rows and {} columns to {}\".format(table.num_rows, len(table.schema), table_id))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "3JLSv3Dvd3mA" }, "outputs": [], "source": [ "%%bigquery\n", "\n", "SELECT * FROM `chocolate_ai.data_insights` ORDER BY data_insights_id;" ] }, { "cell_type": "markdown", "metadata": { "id": "42IxhtRRrvR-" }, "source": [ "### <font color='#4285f4'>Clean Up</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6lF2Z7skFbvf" }, "outputs": [], "source": [ "# Placeholder" ] }, { "cell_type": "markdown", "metadata": { "id": "ASQ2BPisXDA0" }, "source": [ "### <font color='#4285f4'>Reference Links</font>\n" ] }, { "cell_type": "markdown", "metadata": { "id": "2grorb1x8KFU" }, "source": [ "- [Google.com](https://www.google.com)" ] } ], "metadata": { "colab": { "collapsed_sections": [ "HMsUvoF4BP7Y", "m65vp54BUFRi", "UmyL-Rg4Dr_f", "sZ6m_wGrK0YG", "JbOjdSP1kN9T", "4ndOu9GkyoBJ", "nKJr7o-cg3FE", "yyq-NQ2gtxQs", "hwAEuMl1VZR5", "sgehzGJMs5J6", "c6ZTj8Bev2nP", "OuSRYmz0t1u2", "no0x5CHub1IU", "MVqzTz8qb7PB", "DC6-LnNob90k", "Hefp2KmtuGAz", "u5XAEpZyulR0", "D4goQllt3a5L", "5DOZQ0j-3fnT", "42IxhtRRrvR-", "ASQ2BPisXDA0" ], "name": "Campaign-Performance-Insights", "private_outputs": true, "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }