### <font color='#4285f4'>Overview</font>

**Overview**: Data insights offers an automated way to explore and understand your data. With data insights, Gemini uses metadata to generate natural language questions about your table and the queries to answer them. This helps you uncover patterns, assess data quality, and perform statistical analysis.

This notebook will gather all the tables in the raw, enriched and curated zones and create a data insights scan on each table.  Data insights will allow you to generate working queries on each table to start your analytical process.

**Process Flow:** 

1.  Select all the tables in the raw, enriched, and curated datasets.
2.  Create a list of dictionaries to hold the table details and some additional fields to hold the scan name and scan state.
3.  Set concurrency level. We will run up to 5 scans at once.
4.  While all scans not completed:
    *   Count the number of scans currently in the Pending, Running, Unspecified, and Cancelling states.
    *   If we are less than our concurrency level (5) then start more scans.
        *   When starting the scan, save the scan name and set the state to Unspecified.
    *   Count the number of scans currently in the Pending, Running, Unspecified, and Cancelling states.
    *   If zero are running, exit loop.
5.  For each successful scan:
    *   Update the labels with the associated BigQuery table so the scan will show in the Google Console user interface.

Notes:
* You should run a Data Profile scan first since you get better insights if a profile scan exists.
* This notebook runs the scans manually. Typically, you should schedule a scan on a schedule and not worry about processing.

Cost:
* Approximate cost: Less than 1 dollar

Author:
* Adam Paternostro

In [None]:
# Architecture Diagram
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Data-Insights.png', width=1200)

### <font color='#4285f4'>Video Walkthrough</font>

[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Insights.mp4)


In [None]:
from IPython.display import HTML

HTML("""
<video width="800" height="600" controls>
  <source src="https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Insights.mp4" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")

### <font color='#4285f4'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

### <font color='#4285f4'>Pip installs</font>

In [None]:
# NOTE: All calls in this notebooks are done via REST APIs

# PIP Installs (if necessary)
import sys

# !{sys.executable} -m pip install REPLACE-ME

### <font color='#4285f4'>Initialize</font>

In [None]:
from PIL import Image
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import random
import time
import datetime
import base64
import random

import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

In [None]:
# Set these (run this cell to verify the output)

bigquery_location = "${bigquery_location}"
dataplex_region = "${dataplex_region}"

# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

# Get some values using gcloud
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

### <font color='#4285f4'>Helper Methods</font>

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### RunQuery
Runs a BigQuery query.

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

### <font color='#4285f4'>Data Insights Scan - Helper Methods</font>

#### existsDataInsightScan
- Tests to see if a Data Insights Scan exists
- Returns True/False

In [None]:
def existsDataInsightScan(project_id, dataplex_region, data_insights_scan_name):
  """Test to see if a scan exists."""

  # Gather existing data scans
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/list

  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans"

  # Gather existing data scans
  json_result = restAPIHelper(url, "GET", None)
  print(f"createDataDocumentScan (GET) json_result: {json_result}")

  # Test to see if data scan exists, if so return
  if "dataScans" in json_result:
    for item in json_result["dataScans"]:
      print(f"Scan names: {item['name']}")
      if item["name"] == f"projects/{project_id}/locations/{dataplex_region}/dataScans/{data_insights_scan_name}":
        print(f"Data Document Scan {data_insights_scan_name} already exists")
        return True

  return False

#### createDataInsightScan
- Creates a insights scan, but does not run it
- If the scan exists, the does nothing

In [None]:
def createDataInsightScan(project_id, dataplex_region, data_insight_scan_name, data_insight_display_name, bigquery_dataset_name, bigquery_table_name):
  """Creates the data insights scan."""

  if existsDataInsightScan(project_id, dataplex_region, data_insight_scan_name) == False:
    # Create a new scan
    # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create
    print("Creating Data Insight Scan")

    url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans?dataScanId={data_insight_scan_name}"

    request_body = {
        "displayName": data_insight_display_name,
        "type": "DATA_DOCUMENTATION",
        "dataDocumentationSpec": {},
        "data":{
            "resource": f"//bigquery.googleapis.com/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}"
            }
        }

    json_result = restAPIHelper(url, "POST", request_body)

    name = json_result["metadata"]["target"]
    print(f"Data Insight Scan created: {name}")
  else:
    print(f"Data Insight Scan exists: projects/{project_id}/locations/{dataplex_region}/dataScans/{data_insight_scan_name}")

#### startDataInsightScan
- Starts a data insight scan (async)
- Returns the "job name"

In [None]:
def startDataInsightScan(project_id, dataplex_region, data_insight_scan_name):
  """Starts the scan"""

  # Create a new scan
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/run
  print("Running Data Insight Scan")

  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_insight_scan_name}:run"

  request_body = { }

  json_result = restAPIHelper(url, "POST", request_body)
  job_name = json_result["job"]["name"]
  job_state = json_result["job"]["state"]
  print(f"Document Data Scan Run created: {job_name} - State: {job_state}")

  return job_name


#### getStateDataInsightScan
- Gets the state of a scan (to see if it is done)
- Returns the "state"

In [None]:
def getStateDataInsightScan(project_id, dataplex_region, data_insight_scan_job_name):
  """Runs the data insight scan job and monitors until it completes"""

  # Gets the "state" of a scan
  url = f"https://dataplex.googleapis.com/v1/{data_insight_scan_job_name}"
  json_result = restAPIHelper(url, "GET", None)
  return json_result["state"]
  #== "STATE_UNSPECIFIED" or json_result["state"] == "RUNNING" or json_result["state"] == "PENDING":


#### updateBigQueryTableDataplexLabels
- Patches the BigQuery table so that we associate the a Dataplex item with the BigQuery table so you see it in the UI
- Returns nothing

In [None]:
def updateBigQueryTableDataplexLabels(project_id, dataplex_region, dataplex_asset_type, dataplex_asset_scan_name, bigquery_dataset_name, bigquery_table_name):
  """Sets the labels on the BigQuery table so users can see the scans in the Console."""

  # Patch BigQuery
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create
  print("Patching BigQuery Dataplex Labels")

  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}"

  request_body = {}
  if dataplex_asset_type == "DATA-PROFILE-SCAN":
    request_body = {
        "labels" : {
            "dataplex-dp-published-project"  : project_id,
            "dataplex-dp-published-location" : dataplex_region,
            "dataplex-dp-published-scan"     : dataplex_asset_scan_name,
            }
        }
  elif dataplex_asset_type == "DATA-INSIGHTS-SCAN":
     request_body = {
        "labels" : {
            "dataplex-data-documentation-published-project"  : project_id,
            "dataplex-data-documentation-published-location" : dataplex_region,
            "dataplex-data-documentation-published-scan"     : dataplex_asset_scan_name,
            }
        }
  else:
    raise Exception(f"Unknown dataplex_asset_type of {dataplex_asset_type}")

  json_result = restAPIHelper(url, "PATCH", request_body)
  print(json_result)

### <font color='#4285f4'>Run Data Insight Scan - Algorithm</font>

In [None]:
# Get all the tables we want to scan for each dataset
scans_to_perform = []
dataset_list = ["${bigquery_governed_data_raw_dataset}","${bigquery_governed_data_enriched_dataset}","${bigquery_governed_data_curated_dataset}"]

sql = ""
for dataset_name in dataset_list:
  sql += f"SELECT table_schema, table_name, table_type from `{dataset_name}.INFORMATION_SCHEMA.TABLES` UNION ALL "

# Remove training union all
sql = sql.rstrip(" UNION ALL ")

result_df = RunQuery(sql)
table_list = []

# data_insight_scan_name: "Field data_scan_id must contain only lowercase letters, numbers, and/or hyphens
for index, row in result_df.iterrows():
  item = {
      "project_id": project_id,
      "dataplex_region": dataplex_region,
      "data_insight_scan_name": f"{row['table_schema']}-{row['table_name']}-insight-scan".lower().replace("_","-"),
      "data_insight_display_name": f"{row['table_name']} scan",
      "bigquery_dataset_name": row['table_schema'],
      "bigquery_table_name": row['table_name'],

      # Used by below loop for processing
      "data_insight_scan_state": "",
      "data_insight_scan_job_name": ""
  }
  scans_to_perform.append(item)

  print(f"item: {item}")

In [None]:
# Run the scans (up to a certain concurrency level)
numberOfScansToRunConcurrently = 5

while True:
  # Count the number of scans that are running
  concurrentScanCount = 0
  for item in scans_to_perform:
    if item["data_insight_scan_state"] == "PENDING" or \
       item["data_insight_scan_state"] == "STATE_UNSPECIFIED" or \
       item["data_insight_scan_state"] == "RUNNING" or \
       item["data_insight_scan_state"] == "CANCELING":
       # Update our count
       print(f"Concurrent Scan Count: {item['bigquery_dataset_name']}.{item['bigquery_table_name']} -> {item['data_insight_scan_state']}")
       concurrentScanCount += 1
    else:
       print(f"Concurrent Scan Count: {item['bigquery_dataset_name']}.{item['bigquery_table_name']} -> {item['data_insight_scan_state']}")

  print(f"concurrentScanCount: {concurrentScanCount}")

  # Start new scans under our concurrency count
  scansStarted = -1
  while concurrentScanCount < numberOfScansToRunConcurrently and scansStarted != 0:
    # Start new scans up to the concurrency limit
    scansStarted = 0
    for item in scans_to_perform:
      if concurrentScanCount < numberOfScansToRunConcurrently and \
         item["data_insight_scan_state"] == "":
        # start a new scan
        createDataInsightScan(item["project_id"], item["dataplex_region"],
                              item["data_insight_scan_name"], item["data_insight_display_name"],
                              item["bigquery_dataset_name"], item["bigquery_table_name"])
        started = False
        item["data_insight_scan_job_name"] = ""
        while started == False:
          try:
            item["data_insight_scan_job_name"] = startDataInsightScan(item["project_id"], item["dataplex_region"], item["data_insight_scan_name"])
            item["data_insight_scan_state"] = "STATE_UNSPECIFIED"
            started = True
            scansStarted += 1
            concurrentScanCount += 1
          except Exception as e:
            scan_full_name = f'projects/{item["project_id"]}/locations/{item["dataplex_region"]}/dataScans/{item["data_insight_scan_name"]}'
            message = f"Provided DataScan '{scan_full_name}' does not exist."
            print(message)
            if message in str(e):
              print(f"Data scan is not available to start.  Waiting...")
              time.sleep(5)
            else:
              raise e  # Re-raise the exception for other errors


  # Update the status for the scans that are processing
  for item in scans_to_perform:
    if item["data_insight_scan_state"] == "PENDING" or \
       item["data_insight_scan_state"] == "STATE_UNSPECIFIED" or \
       item["data_insight_scan_state"] == "RUNNING" or \
       item["data_insight_scan_state"] == "CANCELING":
       # Get the latest state
       item["data_insight_scan_state"] = getStateDataInsightScan(item["project_id"], item["dataplex_region"], item["data_insight_scan_job_name"])

  if concurrentScanCount == 0:
    # nothing processing- exit
    break
  else:
    # wait for processing
    print(f"concurrentScanCount: {concurrentScanCount}")
    time.sleep(10)

# Update the BigQuery labels so our scans show in the Console UI
for item in scans_to_perform:
  if item["data_insight_scan_state"] == "SUCCEEDED":
    # skip CANCELLED or FAILED states
    updateBigQueryTableDataplexLabels(item["project_id"], item["dataplex_region"],
                                      "DATA-INSIGHTS-SCAN", item["data_insight_scan_name"],
                                      item["bigquery_dataset_name"], item["bigquery_table_name"])

  print(f"Associated scan for table {item['bigquery_dataset_name']}.{item['bigquery_table_name']} associated with BigQuery Console UI.")


### <font color='#4285f4'>Clean Up</font>

In [None]:
# Placeholder

### <font color='#4285f4'>Reference Links</font>


- [REPLACE-ME](https://REPLACE-ME)