### <font color='#4285f4'>Overview</font>

**Overview**: Creates a data discovery scan of a storage account the discovers 4 tables (CSV files) and creates BigLake tables over the CSV files.

**Process Flow**:
1.  **Copy CSV files** to a separate folder within the "scan" GCS bucket.

2.  **Create a data discovery scan** for the bucket.

3.  **Specify the storage account** for the scan.

4.  **Specify BigLake table creation** and provide the necessary BigLake connection details.

5.  **Pause for a few seconds** while the scan registers (avoid starting the scan too quickly).

6.  **Start the data discovery scan.**

7.  **Wait for the scan to complete.**

8.  **Review the newly created BigQuery dataset.**

9.  **(Optional) Delete the scan.**

Notes:
* This notebook runs the scans manually. Typically, you should schedule a scan on a schedule and not worry about processing.

Cost:
* Approximate cost: Less than a dollar

Author:
* Adam Paternostro

In [None]:
# Architecture Diagram
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Data-Discovery.png', width=1200)

### <font color='#4285f4'>Video Walkthrough</font>

[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Discovery-Scan.mp4)


In [None]:
from IPython.display import HTML

HTML("""
<video width="800" height="600" controls>
  <source src="https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Discovery-Scan.mp4" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")

### <font color='#4285f4'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

### <font color='#4285f4'>Pip installs</font>

In [None]:
# PIP Installs (if necessary)
import sys

# !{sys.executable} -m pip install REPLACE-ME

### <font color='#4285f4'>Initialize</font>

In [None]:
from PIL import Image
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import random
import time
import datetime
import base64
import random

import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

In [None]:
# Set these (run this cell to verify the output)

bigquery_location = "${bigquery_location}"
region = "${dataplex_region}"
location = "${dataplex_region}"
scan_bucket_name = "${governed_data_scan_bucket}"

# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

# Get some values using gcloud
project_id = !(gcloud config get-value project)
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(project_id) != 1:
  raise RuntimeError(f"project_id is not set: {project_id}")
project_id = project_id[0]

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

### <font color='#4285f4'>Helper Methods</font>

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### Helper Functions

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

### <font color='#4285f4'>Copy Data to Storage for Scan</font>

In [None]:
# Copy our data (CSV files).  We want the files in our local bucket with local location.
# For CSV files (or any other file type), you want the folder to contain all the same schema (or table) of data
# You would not want to put people.csv in the same folders as cars.csv

source_file = "gs://data-analytics-golden-demo/cymbal-consumer-finance/ccf_csv_tables_customers.csv"
dest_file = f"gs://{scan_bucket_name}/cymbal-consumer-finance/customers/customers.csv"
print(f"Copying data from {source_file} to {dest_file}")
!gsutil cp {source_file} {dest_file}
print("Customer is complete")

source_file = "gs://data-analytics-golden-demo/cymbal-consumer-finance/ccf_csv_tables_loan_applications.csv"
dest_file = f"gs://{scan_bucket_name}/cymbal-consumer-finance/loan_applications/loan_applications.csv"
print(f"Copying data from {source_file} to {dest_file}")
!gsutil cp {source_file} {dest_file}
print("Customer is complete")

source_file = "gs://data-analytics-golden-demo/cymbal-consumer-finance/ccf_csv_tables_loan_repayments.csv"
dest_file = f"gs://{scan_bucket_name}/cymbal-consumer-finance/loan_repayments/loan_repayments.csv"
print(f"Copying data from {source_file} to {dest_file}")
!gsutil cp {source_file} {dest_file}
print("Customer is complete")

source_file = "gs://data-analytics-golden-demo/cymbal-consumer-finance/ccf_csv_tables_marketing_costs.csv"
dest_file = f"gs://{scan_bucket_name}/cymbal-consumer-finance/marketing_costs/marketing_costs.csv"
print(f"Copying data from {source_file} to {dest_file}")
!gsutil cp {source_file} {dest_file}
print("Customer is complete")

print(f"To view the files: https://console.cloud.google.com/storage/browser/{scan_bucket_name}")

### <font color='#4285f4'>Data Discovery Scan - Helper Methods</font>

#### existsDataDiscoveryScan
- Tests to see if a Data Discovert Scan exists
- Returns True/False

In [None]:
def existsDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name):
  """Test to see if a scan exists."""

  # Gather existing data scans
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/list

  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans"

  # Gather existing data scans
  json_result = restAPIHelper(url, "GET", None)
  print(f"createDataDocumentScan (GET) json_result: {json_result}")

  # Test to see if data scan exists, if so return
  if "dataScans" in json_result:
    for item in json_result["dataScans"]:
      print(f"Scan names: {item['name']}")
      if item["name"] == f"projects/{project_id}/locations/{dataplex_region}/dataScans/{data_discovery_scan_name}":
        print(f"Data Document Scan {data_discovery_scan_name} already exists")
        return True

  return False

#### createDataDiscoveryScan
- Creates a discovery scan, but does not run it
- If the scan exists, the does nothing

In [None]:
def createDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name, data_discovery_display_name, resource, biglake_connection_name):
  """Creates the data discovery scan."""

  if existsDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name) == False:
    # Create a new scan
    # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create
    print("Creating Data Discovery Scan")

    url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans?dataScanId={data_discovery_scan_name}"

    request_body = {
        "displayName": data_discovery_display_name,
        "type": "DATA_DISCOVERY",
        "data":{
            "resource": resource
            },
        "dataDiscoverySpec": {
            "storageConfig": {
                 "csvOptions":
                  {
                      "delimiter":",",
                      "headerRows":1
                  }
                 # "includePatterns": includePatterns  # We are just doing one for the demo
             },
             "bigqueryPublishingConfig": {
                "connection": biglake_connection_name,
                "tableType": "BIGLAKE"
              }
        }
    }

    json_result = restAPIHelper(url, "POST", request_body)

    name = json_result["metadata"]["target"]
    print(f"Data Discovery Scan created: {name}")
  else:
    print(f"Data Discovery Scan exists: projects/{project_id}/locations/{dataplex_region}/dataScans/{data_discovery_scan_name}")

#### startDataDiscoveryScan
- Starts a data discovery scan (async)
- Returns the "job name"

In [None]:
def startDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name):
  """Starts the scan"""

  # Create a new scan
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/run
  print("Running Data Discovery Scan")

  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_discovery_scan_name}:run"

  request_body = { }

  json_result = restAPIHelper(url, "POST", request_body)
  job_name = json_result["job"]["name"]
  job_state = json_result["job"]["state"]
  print(f"Document Data Scan Run created: {job_name} - State: {job_state}")

  return job_name


#### getStateDataDiscoveryScan
- Gets the state of a scan (to see if it is done)
- Returns the "state"

In [None]:
def getStateDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_job_name):
  """Gets the status for the scan job"""

  # Gets the "state" of a scan
  url = f"https://dataplex.googleapis.com/v1/{data_discovery_scan_job_name}"
  json_result = restAPIHelper(url, "GET", None)
  return json_result["state"]

#### deleteDataDiscoveryScan
- Deletes the scan if it exists
- Returns nothing

In [None]:
def deleteDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name):
  """Deletes the scan"""

  if existsDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name) == True:
    # Deletes a scan
    url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_discovery_scan_name}"
    json_result = restAPIHelper(url, "DELETE", None)
    print(f"Scan {data_discovery_scan_name} deleted.")
  else:
    print(f"Scan {data_discovery_scan_name} does not exist to delete.")

### <font color='#4285f4'>Run Data Discovery Scan (for a bucket)</font>

- Creates a new scan
- Starts the scan (after a delay)
- Monitors the scans progress

In [None]:
dataplex_region = location
data_discovery_scan_name = "data-discovery-scan-01"
data_discovery_display_name = "Data Discovery Scan (01)"

# We will use the dataplex region here (not US multi-region)
resource = f"//storage.googleapis.com/projects/{project_id}/buckets/{scan_bucket_name}"
biglake_connection_name = f"projects/{project_id}/locations/{region}/connections/biglake-connection-dataplex"
#includePatterns = [ f"gs://{scan_bucket_name}/table-1-etc.../*.csv" ]

print(f"Creating scan of bucket(s): {scan_bucket_name}")
print()

createDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name, data_discovery_display_name,
                        resource, biglake_connection_name)


In [None]:
# It can take a few seconds for the scan to register
time.sleep(20)

In [None]:
# Start the scan
data_discovery_scan_job_name = startDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name)
print(f"data_discovery_scan_job_name: {data_discovery_scan_job_name}")

In [None]:
# Monitor the scan
data_discovery_scan_state = getStateDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_job_name)
print(f"data_discovery_scan_state: {data_discovery_scan_state}")

while data_discovery_scan_state == "PENDING" or \
      data_discovery_scan_state == "STATE_UNSPECIFIED" or \
      data_discovery_scan_state == "RUNNING" or \
      data_discovery_scan_state == "CANCELING":
  time.sleep(10)
  data_discovery_scan_state = getStateDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_job_name)
  print(f"data_discovery_scan_state: {data_discovery_scan_state}")

print("Discovery Scan complete.  You should see a new BigQuery dataset.")

In [None]:
print(f"You can view/deletes scans here: https://console.cloud.google.com/bigquery/governance/catalog-management/cloud-storage-discovery?project={project_id}")

### <font color='#4285f4'>Clean Up</font>

In [None]:
# Delete the scan

user_input = input(f"Do you want to delete your scan {data_discovery_scan_name} (Y/n)?")
if user_input == "Y":
  print("This will not delete the dataset created by the scan.")
  deleteDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name)

### <font color='#4285f4'>Reference Links</font>


- [REPLACE-ME](https://REPLACE-ME)