### <font color='#4285f4'>Overview</font>

**Overview**: Demonstrate the ability to create data quality rules on tables within BigQuery. This demo will retrieve suggested data quality rules based on data profile scans. The rules will then be used to create a data quality scan for each table.

Also, custom rules will be applied to the “curated” order_detail and sales tables.


**Process Flow**:

1. Select all the tables in the raw, enriched and curated datasets.

2. Gather the data profile scan for each respective table.

3. Gather the recommended data quality rules for each table.

4. Create a data quality scan on the table using the suggested rules. 
    - Create custom rules for the curated invoice detail and sales tables.

5. Update the BigQuery user interface (patch the labels) so we see the scan in the BigQuery user interface.

6. Create a custom rule for the curated invoice detail table to see how custom rules can be applied to a table.

Notes:
* This notebook runs the scans manually. Typically, you should schedule a scan on a schedule and not worry about processing.

Cost:
* Approximate cost: Less than a dollar

Authors:
* Sandeep Manocha, Adam Paternostro

In [None]:
# Architecture Diagram
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Data-Quality.png', width=1200)

### <font color='#4285f4'>Video Walkthrough</font>

[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Quality.mp4)


In [None]:
from IPython.display import HTML

HTML("""
<video width="800" height="600" controls>
  <source src="https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Quality.mp4" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")

### <font color='#4285f4'>License</font>

```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

### <font color='#4285f4'>Pip installs</font>

In [None]:
# NOTE: All calls in this notebooks are done via REST APIs

# PIP Installs (if necessary)
import sys

# !{sys.executable} -m pip install REPLACE-ME

### <font color='#4285f4'>Initialize</font>

In [None]:
from PIL import Image
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import random
import time
import datetime
import base64
import random
import pandas as pd
import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

In [None]:
# Set these (run this cell to verify the output)

bigquery_location = "${bigquery_location}"
dataplex_region = "${dataplex_region}"

# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

# Get some values using gcloud
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

if 'altostrat.com' in user:
  notification_email = f"{user.split('@')[1].split('.')[0]}@google.com"

print(f"notification_email:{notification_email}")

### <font color='#4285f4'>Helper Methods</font>

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  # Required by some API calls
  if project_id != None:
    headers["x-goog-user-project"] = project_id


  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### RunQuery
Runs a BigQuery query.

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

### <font color='#4285f4'>Data Quality Scan - Helper Methods</font>

#### existsDataQualityScan
- Tests to see if a Data Quality Scan exists
- Returns True/False

In [None]:
def existsDataQualityScan(project_id, dataplex_region, data_quality_scan_name):
  """Creates the data Quality scan."""

  # Gather existing data scans
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/list

  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans"

  # Gather existing data scans
  json_result = restAPIHelper(url, "GET", None)
  #print(f"existsDataQualityScan (GET) json_result: {json_result}")

  # Test to see if data scan exists, if so return
  if "dataScans" in json_result:
    for item in json_result["dataScans"]:
      # print(f"Scan names: {item['name']}")
      if item["name"] == f"projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}":
        print(f"Data Document Scan {data_quality_scan_name} already exists")
        return True

  return False

#### createDataQualityScan
- Creates a scan, but does not run it
- If the scan exists, the does nothing

In [None]:
def createDataQualityScan(project_id, dataplex_region, data_quality_scan_name, data_quality_display_name, data_quality_description, bigquery_dataset_name, bigquery_table_name, data_quality_spec):
  """Creates the data quality scan."""
  scan_name_fqdn = None
  if existsDataQualityScan(project_id, dataplex_region, data_quality_scan_name) == False:
    # Create a new scan
    # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create
    print("Creating Data Quality Scan")

    url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans?dataScanId={data_quality_scan_name}"

    request_body = {
        "dataQualitySpec": data_quality_spec,
        "data": { "resource": f"//bigquery.googleapis.com/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}" },
        "description": data_quality_description,
        "displayName": data_quality_display_name
        }

    json_result = restAPIHelper(url, "POST", request_body)

    scan_name_fqdn = json_result["metadata"]["target"]
    print(f"Data Quality Scan created: {scan_name_fqdn}")
  else:
    print(f"Data Quality Scan exists: projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}")

  return scan_name_fqdn

#### updateDataQualityScan

In [None]:
def updateDataQualityScan(project_id, dataplex_region, data_quality_scan_name, data_quality_display_name, data_quality_description, bigquery_dataset_name, bigquery_table_name, data_quality_spec):
  """Creates the data quality scan."""
  scan_name_fqdn = None
  if existsDataQualityScan(project_id, dataplex_region, data_quality_scan_name) == True:
    # Create a new scan
    # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/patch
    print("Updating Data Quality Scan")

    url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}?updateMask=dataQualitySpec"

    print(url)

    request_body = {
        "dataQualitySpec": data_quality_spec,
        "data": { "resource": f"//bigquery.googleapis.com/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}" },
        "description": data_quality_description,
        "displayName": data_quality_display_name
        }
    print("Submitting REST PATCH")
    print(request_body)
    json_result = restAPIHelper(url, "PATCH", request_body)
    print("Completed REST PATCH")

    scan_name_fqdn = json_result["metadata"]["target"]
    print(f"Data Quality Scan created: {scan_name_fqdn}")
  else:
    print(f"Data Quality Scan Not Exists: projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}")

  return scan_name_fqdn

#### startDataQualityScan
- Starts a data quality scan (async)
- Returns the "job name"

In [None]:
def startDataQualityScan(project_id, dataplex_region, data_quality_scan_name):
  """Runs the data profile scan job and monitors until it completes"""

  # Create a new scan
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/run
  print("Running Data Quality Scan")

  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}:run"


  request_body = { }

  json_result = restAPIHelper(url, "POST", request_body)
  job_name = json_result["job"]["name"]
  job_state = json_result["job"]["state"]
  print(f"Document Data Scan Run created: {job_name} - State: {job_state}")

  return job_name


#### getStateDataQualityScan
- Gets the state of a scan (to see if it is done)
- Returns the "state"

In [None]:
def getStateDataQualityScan(project_id, dataplex_region, data_quality_scan_job_name):
  """Runs the data quality scan job and monitors until it completes"""

  # Gets the "state" of a scan
  url = f"https://dataplex.googleapis.com/v1/{data_quality_scan_job_name}"
  json_result = restAPIHelper(url, "GET", None)
  return json_result["state"]
  #== "STATE_UNSPECIFIED" or json_result["state"] == "RUNNING" or json_result["state"] == "PENDING":


#### getDataQualityScanConfiguration
- Gets the scan configuration

In [None]:
def getDataQualityScanConfiguration(project_id, dataplex_region, data_quality_scan_fqdn):
  """Get the Data Quality Configuration as JSON"""

  # Gets the "state" of a scan
  url = f"https://dataplex.googleapis.com/v1/{data_quality_scan_fqdn}?view=FULL"
  json_result = restAPIHelper(url, "GET", None)
  return json_result

#### listScans
- List all the Scan (Data Profile & Data Quality)

In [None]:
def listScans(project_id, dataplex_region):
  """Get the Data Quality Configuration as JSON"""

  # Gets the "state" of a scan
  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans"
  json_result = restAPIHelper(url, "GET", None)
  return json_result

In [None]:
def getScan(project_id, dataplex_region, scan_name):
  """Get the Data Quality Configuration as JSON"""

  # Gets the "state" of a scan
  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{scan_name}"
  json_result = restAPIHelper(url, "GET", None)
  return json_result

#### flatten_dict_values
- Faltten the dictionaries to columns in Pandas Data Frame

In [None]:
def flatten_dict_values(df, column_name):
    """
    Flattens dictionary values in a specified DataFrame column into individual columns.

    Args:
        df: The Pandas DataFrame.
        column_name: The name of the column containing dictionaries.

    Returns:
        A new Pandas DataFrame with the flattened columns, or the original DataFrame if
        the specified column does not exist or doesn't contain dictionaries.
    """
    if column_name not in df.columns:
        print(f"Column '{column_name}' not found in DataFrame.")
        return df  # Return original DataFrame

    # Check if column contains dictionaries
    if not all(isinstance(x, dict) if pd.notna(x) else True for x in df[column_name]): #check if all values in column are dictionaries
        print(f"Column '{column_name}' does not contain dictionaries.")
        return df  # Return original DataFrame

    # Efficiently flatten the dictionaries using a list comprehension and pd.json_normalize
    flattened_data = pd.json_normalize(df[column_name])

    # Concatenate the flattened data with the original DataFrame
    df = df.drop(columns=column_name, errors='ignore') #drop original column
    df = pd.concat([df, flattened_data], axis=1)

    return df

#### getDataQualityScanRecommendations
- Gets the scan Recommendations from Profiles

In [None]:
def getDataQualityScanRecommendationsByProfile(project_id, dataplex_region, data_profile_scan_name):
  """Gets scan recommendations based on table profile"""

  # Gets scan recommendations based on table profile
  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_profile_scan_name}:generateDataQualityRules"
  json_result = restAPIHelper(url, "POST", None)
  return json_result

In [None]:
def getDataQualityScanRecommendationsByProfileJob(project_id, dataplex_region, data_profile_scan_name, data_profile_scan_job_name):
  """Gets scan recommendations based on table profile"""

  # Gets scan recommendations based on table profile
  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_profile_scan_name}/jobs/{data_profile_scan_job_name}:generateDataQualityRules"
  json_result = restAPIHelper(url, "POST", None)
  return json_result

#### updateBigQueryTableDataplexLabels
- Patches the BigQuery table so that we associate the a Dataplex item with the BigQuery table so you see it in the UI
- Returns nothing

In [None]:
def updateBigQueryTableDataplexLabels(project_id, dataplex_region, dataplex_asset_type, dataplex_asset_scan_name, bigquery_dataset_name, bigquery_table_name):
  """Sets the labels on the BigQuery table so users can see the data profile in the Console."""

  # Patch BigQuery
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create
  print("Patching BigQuery Dataplex Labels")

  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}"

  request_body = {}
  if dataplex_asset_type == "DATA-PROFILE-SCAN":
    request_body = {
        "labels" : {
            "dataplex-dp-published-project"  : project_id,
            "dataplex-dp-published-location" : dataplex_region,
            "dataplex-dp-published-scan"     : dataplex_asset_scan_name,
            }
        }
  elif dataplex_asset_type == "DATA-INSIGHTS-SCAN":
     request_body = {
        "labels" : {
            "dataplex-data-documentation-project"  : project_id,
            "dataplex-data-documentation-location" : dataplex_region,
            "dataplex-data-documentation-scan"     : dataplex_asset_scan_name,
            }
        }
  elif dataplex_asset_type == "DATA-QUALITY-SCAN":
     request_body = {
        "labels" : {

            "dataplex-dq-published-project"  : project_id,
            "dataplex-dq-published-location" : dataplex_region,
            "dataplex-dq-published-scan"     : dataplex_asset_scan_name,
            }
        }
  else:
    raise Exception(f"Unknown dataplex_asset_type of {dataplex_asset_type}")

  json_result = restAPIHelper(url, "PATCH", request_body)
  print(json_result)

#### deleteDataQualityScanConfiguration
- Deletes a DataScan resource.

In [None]:
def deleteDataQualityScanConfiguration(project_id, dataplex_region, data_quality_scan_job_name):
  """Gets scan recommendations based on table profile"""

  # Gets scan recommendations based on table profile
  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}"
  json_result = restAPIHelper(url, "DELETE", None)
  return json_result

### <font color='#4285f4'>BQ Helper Methods</font>

#### isTableExists

project_id, dataset_name, table_name

In [None]:
def isTableExists(project_id, dataset_name, table_name):
  import io
  import google.cloud.bigquery as bigquery
  import json

  try:
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_name, project=project_id)
    table_ref = dataset_ref.table(table_name)
    table = client.get_table(table_ref)
    return True
  except Exception as e:
        # print(f"Error reading table schema: {e}")
        # exc_type, exc_value, exc_traceback = sys.exc_info()
        # line_number = exc_traceback.tb_lineno
        # print(f"An error occurred on line {line_number}: {e}")
        return False

#### RunQuery

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

### <font color='#4285f4'>Create Dataset to hold Data Quality Results</font>

In [None]:
# Create a new dataset to hold the SDP results (keep seperate from source tables)

governed_data_scan_dataset_name = "governed_data_scan_quality_results"

sql = f"""CREATE SCHEMA IF NOT EXISTS `{project_id}.{governed_data_scan_dataset_name}` OPTIONS(location="{bigquery_location}")"""

RunQuery(sql)

dq_results_table_name = "data_quality_metrics"

### <font color='#4285f4'>Run Data Quality Scans - Algorithm</font>

#### Create a custom SQL rule (to show an example)

In [None]:
# Create some custom rules for the the tables order_details and sales in the curated dataset

def custom_sql_rule(project_id, dataset_name, table_name):
  # We will create some rules that fail for demostration purposes
  # The columns here are common between the two tables, you would typically have rules per table, but here we are re-using.

  sql = ""
  if table_name == "sales":
    sql = f"SELECT customer_id FROM `{project_id}.{dataset_name}.{table_name}` WHERE customer_id NOT IN (SELECT customer_id FROM `{project_id}.{dataset_name}.customer` WHERE customer_id > 5)"
  else:
    # order_detail
    sql = f"SELECT product_id FROM `{project_id}.{dataset_name}.{table_name}` WHERE product_id NOT IN (SELECT product_id FROM `{project_id}.{dataset_name}.product` WHERE product_id > 5)"

  rules = [
      ####
      {
      "dimension": "VALIDITY",
      "threshold": 1,
      "name": "sale-price-check",
      "description": "Sale Price Check",
      "rowConditionExpectation": {
                      "sqlExpression": f"price >= (SELECT MAX(price) - 5 FROM `{project_id}.{dataset_name}.{table_name}`)"
          }
      },
      ####
      { "sqlAssertion": {"sqlStatement": sql},
        "dimension": "CONSISTENCY",
        "name": "orphan-customers",
        "description": "Customers sales without a customer record."
      },
      ####
      {
      "dimension": "ACCURACY",
      "name": "zero-sold-quantity",
      "description": "Products sold with 0 quantity",
      "tableConditionExpectation": {
                      "sqlExpression": f"(SELECT COUNT(*) CNT FROM `{project_id}.{dataset_name}.{table_name}` WHERE quantity = 0) > 0"
          }
      },
      ####
      {"nonNullExpectation": {},
      "column": "quantity",
      "dimension": "COMPLETENESS",
      "threshold": 1
      },
      ####
      {"rangeExpectation":
          { "minValue": "0",
            "maxValue": "1000",
            "strictMinEnabled": True
          },
      "column": "price",
      "ignoreNull": True,
      "dimension": "VALIDITY",
      "threshold": 1,
      "name": "price-range-lt-1000",
      "description": "Price should be less than $1000"
      }
  ]
  return rules

#### Get all the tables we want to scan for each dataset

In [None]:
# Get all the tables we want to scan for each dataset
scans_to_perform = []
dataset_list = ["${bigquery_governed_data_raw_dataset}","${bigquery_governed_data_enriched_dataset}","${bigquery_governed_data_curated_dataset}"]

sql = ""
for dataset_name in dataset_list:
  sql += f"SELECT table_schema, table_name, table_type from `{dataset_name}.INFORMATION_SCHEMA.TABLES` UNION ALL "

# Remove training union all
sql = sql.rstrip(" UNION ALL ")

result_df = RunQuery(sql)
table_list = []

# data_profile_scan_name: "Field data_scan_id must contain only lowercase letters, numbers, and/or hyphens
for index, row in result_df.iterrows():
  item = {
      "project_id": project_id,
      "dataplex_region": dataplex_region,
      "data_profile_scan_name": f"{row['table_schema']}-{row['table_name']}-profile-scan".lower().replace("_","-"),
      "data_profile_display_name": f"{row['table_schema']}-{row['table_name']} profile scan",
      "data_quality_scan_name": f"{row['table_schema']}-{row['table_name']}-quality-scan".lower().replace("_","-"),
      "data_quality_display_name": f"{row['table_schema']}.{row['table_name']} AutoDQ scan",
      "data_quality_description":  f"{row['table_schema']}.{row['table_name']} AutoDQ scan based upon recommended rules",
      "bigquery_dataset_name": row['table_schema'],
      "bigquery_table_name": row['table_name'],

      # Used by below loop for processing
      "data_quality_scan_state": "",
      "data_quality_scan_job_name": ""
  }
  scans_to_perform.append(item)

  print(f"item: {item}")

#### Searches all the DataScans which contain profile scan, insights scans and data quality scans for a specific scan

In [None]:
def search_data_scans(data, name=None, state=None, type=None):
    """Searches the dataScans list for entries matching the given criteria.  More efficient version.

    Args:
        data: The dictionary containing the dataScans list.
        name: The name to search for (optional).
        state: The state to search for (optional).
        type: The type to search for (optional).

    Returns:
        A list of dictionaries that match the search criteria.  Returns an empty list if no matches are found.
    """

    data_scans = data.get("dataScans", [])  # Get the dataScans list, or empty list if missing
    results = []

    for scan in data_scans:  # Loop ONLY over the dataScans list
        if (name is None or name in scan.get("name", "")) and \
           (state is None or scan.get("state") == state) and \
           (type is None or scan.get("type") == type):
            results.append(scan)

    return results

#### Create a list of scans we will potentially run

In [None]:
# For each table
# 1. Check to see if a data profile scan exists (if not set the status to SKIP)
# 2. Get the last data profile scan
# 3. Get the recommended rules
# 4. Append custom rules (for the curated invoice detail table)

allDataScans = listScans(project_id, dataplex_region)
for item in scans_to_perform:
  # Check to see if a data profile scan exists (if not set the status to SKIP)
  name = f"projects/{project_id}/locations/{dataplex_region}/dataScans/{item['data_profile_scan_name']}"
  #print(f"Attempting to Match: {name}")
  profile_scan = search_data_scans(allDataScans, name, "ACTIVE", "DATA_PROFILE")
  if profile_scan != []:
    print(f"MATCH: {profile_scan[0]}")
    try:
      recommended_rules = getDataQualityScanRecommendationsByProfile(project_id, dataplex_region, item['data_profile_scan_name'])
      item["data_quality_recommended_rules"] = recommended_rules
      if item["bigquery_dataset_name"] == "${bigquery_governed_data_curated_dataset}" and \
         (item["bigquery_table_name"] == "order_detail" or item["bigquery_table_name"] == "sales"):
         print("Creating custom rules for ${bigquery_governed_data_curated_dataset}.invoice_detail")
         custom_rules = custom_sql_rule(project_id, item["bigquery_dataset_name"], item["bigquery_table_name"])
         # Add the custom rules to the recommended rules
         recommended_rules["rule"].extend(custom_rules)
         item["data_quality_recommended_rules"] = recommended_rules
         print(f"data_quality_recommended_rules: {item['data_quality_recommended_rules']}")
      #print(f"recommended_rules: {recommended_rules}")
      print(f"Scan to be created for: {item['data_quality_scan_name']}")
    except Exception as e:
      print(f"** No recommended rules for {item['data_profile_scan_name']} **")
      item["data_quality_scan_state"]    = "SKIP-RECOMMENDED-RULES"
      item["data_quality_scan_job_name"] = "SKIP-RECOMMENDED-RULES"

  else:
    print(f"** No data profile scan for {item['data_profile_scan_name']} **")
    item["data_quality_scan_state"]    = "SKIP-NO-PROFILE-SCAN"


    item["data_quality_scan_job_name"] = "SKIP-NO-PROFILE-SCAN"

#### Standard notification and export of data quality results for all tables

In [None]:
samplingPercent = 100
postScanActions = {
    "notificationReport": {
    "recipients": {
        "emails": [
            notification_email
        ]
    },
    "scoreThresholdTrigger": {
        "scoreThreshold": 100
    },
    "jobFailureTrigger": {},
    "jobEndTrigger": {}
    },
    "bigqueryExport":
        {"resultsTable": f"//bigquery.googleapis.com/projects/{project_id}/datasets/{governed_data_scan_dataset_name}/tables/{dq_results_table_name}"}
    }

#### Run the data quality scans (up to 5 concurrently) and update the BigQuery Table

In [None]:
# Run the scans (up to a certain concurrency level)
numberOfScansToRunConcurrently = 5

while True:
  # Count the number of scans that are running
  concurrentScanCount = 0
  for item in scans_to_perform:
    if item["data_quality_scan_state"] == "PENDING" or \
       item["data_quality_scan_state"] == "STATE_UNSPECIFIED" or \
       item["data_quality_scan_state"] == "RUNNING" or \
       item["data_quality_scan_state"] == "CANCELING":
       # Update our count
       print(f"Concurrent Scan Count: {item['bigquery_dataset_name']}.{item['bigquery_table_name']} -> {item['data_quality_scan_state']}")
       concurrentScanCount += 1
    else:
       print(f"Concurrent Scan Count: {item['bigquery_dataset_name']}.{item['bigquery_table_name']} -> {item['data_quality_scan_state']}")


  print(f"concurrentScanCount: {concurrentScanCount}")

  # Start new scans under our concurrency count
  scansStarted = -1
  while concurrentScanCount < numberOfScansToRunConcurrently and scansStarted != 0:
    # Start new scans up to the concurrency limit
    scansStarted = 0
    for item in scans_to_perform:
      if concurrentScanCount < numberOfScansToRunConcurrently and \
         item["data_quality_scan_state"] == "":
        # start a new scan
        dataQualitySpec = {}
        dataQualitySpec["samplingPercent"] = samplingPercent
        dataQualitySpec["postScanActions"] = postScanActions
        dataQualitySpec["rules"] = item["data_quality_recommended_rules"]["rule"]

        # Create the data quality scan (if not exists)
        createDataQualityScan(project_id, dataplex_region, item["data_quality_scan_name"],
                              item["data_quality_display_name"], item["data_quality_description"],
                              item["bigquery_dataset_name"], item["bigquery_table_name"],
                              dataQualitySpec)

        started = False
        item["data_quality_scan_job_name"] = ""
        while started == False:
          try:
            item["data_quality_scan_job_name"] = startDataQualityScan(item["project_id"], item["dataplex_region"], item["data_quality_scan_name"])
            item["data_quality_scan_state"] = "STATE_UNSPECIFIED"
            started = True
            scansStarted += 1
            concurrentScanCount += 1
          except Exception as e:
            scan_full_name = f'projects/{item["project_id"]}/locations/{item["dataplex_region"]}/dataScans/{item["data_quality_scan_name"]}'
            message = f"Provided DataScan '{scan_full_name}' does not exist."
            print(message)
            if message in str(e):
              print(f"Data scan is not available to start.  Waiting...")
              time.sleep(5)
            else:
              raise e  # Re-raise the exception for other errors


  # Update the status for the scans that are processing
  for item in scans_to_perform:
    if item["data_quality_scan_state"] == "PENDING" or \
       item["data_quality_scan_state"] == "STATE_UNSPECIFIED" or \
       item["data_quality_scan_state"] == "RUNNING" or \
       item["data_quality_scan_state"] == "CANCELING":
       # Get the latest state
       item["data_quality_scan_state"] = getStateDataQualityScan(item["project_id"], item["dataplex_region"], item["data_quality_scan_job_name"])

  if concurrentScanCount == 0:
    # nothing processing- exit
    break
  else:
    # wait for processing
    print(f"concurrentScanCount: {concurrentScanCount}")
    time.sleep(10)

# Update the BigQuery labels so our scans show in the Console UI
for item in scans_to_perform:
  if item["data_quality_scan_state"] == "SUCCEEDED":
    # skip CANCELLED or FAILED states
    updateBigQueryTableDataplexLabels(item["project_id"], item["dataplex_region"],
                                    "DATA-QUALITY-SCAN", item["data_quality_scan_name"],
                                    item["bigquery_dataset_name"], item["bigquery_table_name"])

  print(f"Associated data quality scan for table {item['bigquery_dataset_name']}.{item['bigquery_table_name']} associated with BigQuery Console UI.")


### <font color='#4285f4'>Clean Up</font>

In [None]:
# Placeholder (you would need to un-patch the BigQuery tables and delete the scans)
print(f"You can delete scans here: https://console.cloud.google.com/dataplex/govern/quality?project={project_id}")

### <font color='#4285f4'>Reference Links</font>


- [REPLACE-ME](https://REPLACE-ME)