data-analytics-demos/bigquery-data-governance/colab-enterprise/05-Data-Quality.ipynb (1,391 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "8rRxIQAxABNK" }, "source": [ "### <font color='#4285f4'>Overview</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "IauvRjuUABNL" }, "source": [ "**Overview**: Demonstrate the ability to create data quality rules on tables within BigQuery. This demo will retrieve suggested data quality rules based on data profile scans. The rules will then be used to create a data quality scan for each table.\n", "\n", "Also, custom rules will be applied to the “curated” order_detail and sales tables.\n", "\n", "\n", "**Process Flow**:\n", "\n", "1. Select all the tables in the raw, enriched and curated datasets.\n", "\n", "2. Gather the data profile scan for each respective table.\n", "\n", "3. Gather the recommended data quality rules for each table.\n", "\n", "4. Create a data quality scan on the table using the suggested rules. \n", " - Create custom rules for the curated invoice detail and sales tables.\n", "\n", "5. Update the BigQuery user interface (patch the labels) so we see the scan in the BigQuery user interface.\n", "\n", "6. Create a custom rule for the curated invoice detail table to see how custom rules can be applied to a table.\n", "\n", "Notes:\n", "* This notebook runs the scans manually. Typically, you should schedule a scan on a schedule and not worry about processing.\n", "\n", "Cost:\n", "* Approximate cost: Less than a dollar\n", "\n", "Authors:\n", "* Sandeep Manocha, Adam Paternostro" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Uynoj7r5ABNL" }, "outputs": [], "source": [ "# Architecture Diagram\n", "from IPython.display import Image\n", "Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Data-Quality.png', width=1200)" ] }, { "cell_type": "markdown", "metadata": { "id": "S7zKuqTpABNL" }, "source": [ "### <font color='#4285f4'>Video Walkthrough</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "8qRI86K6ABNL" }, "source": [ "[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Quality.mp4)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Fc0k4fw4ABNL" }, "outputs": [], "source": [ "from IPython.display import HTML\n", "\n", "HTML(\"\"\"\n", "<video width=\"800\" height=\"600\" controls>\n", " <source src=\"https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Quality.mp4\" type=\"video/mp4\">\n", " Your browser does not support the video tag.\n", "</video>\n", "\"\"\")" ] }, { "cell_type": "markdown", "metadata": { "id": "HMsUvoF4BP7Y" }, "source": [ "### <font color='#4285f4'>License</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "jQgQkbOvj55d" }, "source": [ "```\n", "# Copyright 2024 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "```" ] }, { "cell_type": "markdown", "metadata": { "id": "m65vp54BUFRi" }, "source": [ "### <font color='#4285f4'>Pip installs</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5MaWM6H5i6rX" }, "outputs": [], "source": [ "# NOTE: All calls in this notebooks are done via REST APIs\n", "\n", "# PIP Installs (if necessary)\n", "import sys\n", "\n", "# !{sys.executable} -m pip install REPLACE-ME" ] }, { "cell_type": "markdown", "metadata": { "id": "UmyL-Rg4Dr_f" }, "source": [ "### <font color='#4285f4'>Initialize</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xOYsEVSXp6IP" }, "outputs": [], "source": [ "from PIL import Image\n", "from IPython.display import HTML\n", "import IPython.display\n", "import google.auth\n", "import requests\n", "import json\n", "import uuid\n", "import base64\n", "import os\n", "import cv2\n", "import random\n", "import time\n", "import datetime\n", "import base64\n", "import random\n", "import pandas as pd\n", "import logging\n", "from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wMlHl3bnkFPZ" }, "outputs": [], "source": [ "# Set these (run this cell to verify the output)\n", "\n", "bigquery_location = \"${bigquery_location}\"\n", "dataplex_region = \"${dataplex_region}\"\n", "\n", "# Get the current date and time\n", "now = datetime.datetime.now()\n", "\n", "# Format the date and time as desired\n", "formatted_date = now.strftime(\"%Y-%m-%d-%H-%M\")\n", "\n", "# Get some values using gcloud\n", "project_id = os.environ[\"GOOGLE_CLOUD_PROJECT\"]\n", "user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n", "\n", "if len(user) != 1:\n", " raise RuntimeError(f\"user is not set: {user}\")\n", "user = user[0]\n", "\n", "print(f\"project_id = {project_id}\")\n", "print(f\"user = {user}\")\n", "\n", "if 'altostrat.com' in user:\n", " notification_email = f\"{user.split('@')[1].split('.')[0]}@google.com\"\n", "\n", "print(f\"notification_email:{notification_email}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "sZ6m_wGrK0YG" }, "source": [ "### <font color='#4285f4'>Helper Methods</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "JbOjdSP1kN9T" }, "source": [ "#### restAPIHelper\n", "Calls the Google Cloud REST API using the current users credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "40wlwnY4kM11" }, "outputs": [], "source": [ "def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n", " \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n", "\n", " import requests\n", " import google.auth\n", " import json\n", "\n", " # Get an access token based upon the current user\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request()\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " # Required by some API calls\n", " if project_id != None:\n", " headers[\"x-goog-user-project\"] = project_id\n", "\n", "\n", " if http_verb == \"GET\":\n", " response = requests.get(url, headers=headers)\n", " elif http_verb == \"POST\":\n", " response = requests.post(url, json=request_body, headers=headers)\n", " elif http_verb == \"PUT\":\n", " response = requests.put(url, json=request_body, headers=headers)\n", " elif http_verb == \"PATCH\":\n", " response = requests.patch(url, json=request_body, headers=headers)\n", " elif http_verb == \"DELETE\":\n", " response = requests.delete(url, headers=headers)\n", " else:\n", " raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n", "\n", " if response.status_code == 200:\n", " return json.loads(response.content)\n", " #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n", " else:\n", " error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n", " raise RuntimeError(error)" ] }, { "cell_type": "markdown", "metadata": { "id": "ILcRKf-zgsP5" }, "source": [ "#### RunQuery\n", "Runs a BigQuery query." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "HCo0Mr6zgsyn" }, "outputs": [], "source": [ "def RunQuery(sql):\n", " import time\n", " from google.cloud import bigquery\n", " client = bigquery.Client()\n", "\n", " if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n", " df_result = client.query(sql).to_dataframe()\n", " return df_result\n", " else:\n", " job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n", " query_job = client.query(sql, job_config=job_config)\n", "\n", " # Check on the progress by getting the job's updated state.\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " while query_job.state != \"DONE\":\n", " time.sleep(2)\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " if query_job.error_result == None:\n", " return True\n", " else:\n", " raise Exception(query_job.error_result)" ] }, { "cell_type": "markdown", "metadata": { "id": "ZQYL-OEvAkZ7" }, "source": [ "### <font color='#4285f4'>Data Quality Scan - Helper Methods</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "BVXA0swqCbLn" }, "source": [ "#### existsDataQualityScan\n", "- Tests to see if a Data Quality Scan exists\n", "- Returns True/False" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "XEqmygbrCaYn" }, "outputs": [], "source": [ "def existsDataQualityScan(project_id, dataplex_region, data_quality_scan_name):\n", " \"\"\"Creates the data Quality scan.\"\"\"\n", "\n", " # Gather existing data scans\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/list\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans\"\n", "\n", " # Gather existing data scans\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " #print(f\"existsDataQualityScan (GET) json_result: {json_result}\")\n", "\n", " # Test to see if data scan exists, if so return\n", " if \"dataScans\" in json_result:\n", " for item in json_result[\"dataScans\"]:\n", " # print(f\"Scan names: {item['name']}\")\n", " if item[\"name\"] == f\"projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}\":\n", " print(f\"Data Document Scan {data_quality_scan_name} already exists\")\n", " return True\n", "\n", " return False" ] }, { "cell_type": "markdown", "metadata": { "id": "D-k_P6NwAvT5" }, "source": [ "#### createDataQualityScan\n", "- Creates a scan, but does not run it\n", "- If the scan exists, the does nothing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "GCfNKo1TAkBE" }, "outputs": [], "source": [ "def createDataQualityScan(project_id, dataplex_region, data_quality_scan_name, data_quality_display_name, data_quality_description, bigquery_dataset_name, bigquery_table_name, data_quality_spec):\n", " \"\"\"Creates the data quality scan.\"\"\"\n", " scan_name_fqdn = None\n", " if existsDataQualityScan(project_id, dataplex_region, data_quality_scan_name) == False:\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create\n", " print(\"Creating Data Quality Scan\")\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans?dataScanId={data_quality_scan_name}\"\n", "\n", " request_body = {\n", " \"dataQualitySpec\": data_quality_spec,\n", " \"data\": { \"resource\": f\"//bigquery.googleapis.com/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}\" },\n", " \"description\": data_quality_description,\n", " \"displayName\": data_quality_display_name\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " scan_name_fqdn = json_result[\"metadata\"][\"target\"]\n", " print(f\"Data Quality Scan created: {scan_name_fqdn}\")\n", " else:\n", " print(f\"Data Quality Scan exists: projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}\")\n", "\n", " return scan_name_fqdn" ] }, { "cell_type": "markdown", "metadata": { "id": "g0nB-_E4DdDq" }, "source": [ "#### updateDataQualityScan" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "f0OJRx1pDkEZ" }, "outputs": [], "source": [ "def updateDataQualityScan(project_id, dataplex_region, data_quality_scan_name, data_quality_display_name, data_quality_description, bigquery_dataset_name, bigquery_table_name, data_quality_spec):\n", " \"\"\"Creates the data quality scan.\"\"\"\n", " scan_name_fqdn = None\n", " if existsDataQualityScan(project_id, dataplex_region, data_quality_scan_name) == True:\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/patch\n", " print(\"Updating Data Quality Scan\")\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}?updateMask=dataQualitySpec\"\n", "\n", " print(url)\n", "\n", " request_body = {\n", " \"dataQualitySpec\": data_quality_spec,\n", " \"data\": { \"resource\": f\"//bigquery.googleapis.com/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}\" },\n", " \"description\": data_quality_description,\n", " \"displayName\": data_quality_display_name\n", " }\n", " print(\"Submitting REST PATCH\")\n", " print(request_body)\n", " json_result = restAPIHelper(url, \"PATCH\", request_body)\n", " print(\"Completed REST PATCH\")\n", "\n", " scan_name_fqdn = json_result[\"metadata\"][\"target\"]\n", " print(f\"Data Quality Scan created: {scan_name_fqdn}\")\n", " else:\n", " print(f\"Data Quality Scan Not Exists: projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}\")\n", "\n", " return scan_name_fqdn" ] }, { "cell_type": "markdown", "metadata": { "id": "q7LBwnD6A2pM" }, "source": [ "#### startDataQualityScan\n", "- Starts a data quality scan (async)\n", "- Returns the \"job name\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "C7Xdx8bKAj-a" }, "outputs": [], "source": [ "def startDataQualityScan(project_id, dataplex_region, data_quality_scan_name):\n", " \"\"\"Runs the data profile scan job and monitors until it completes\"\"\"\n", "\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/run\n", " print(\"Running Data Quality Scan\")\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}:run\"\n", "\n", "\n", " request_body = { }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " job_name = json_result[\"job\"][\"name\"]\n", " job_state = json_result[\"job\"][\"state\"]\n", " print(f\"Document Data Scan Run created: {job_name} - State: {job_state}\")\n", "\n", " return job_name\n" ] }, { "cell_type": "markdown", "metadata": { "id": "X8db9qIHBOO8" }, "source": [ "#### getStateDataQualityScan\n", "- Gets the state of a scan (to see if it is done)\n", "- Returns the \"state\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "JhHrg0lcAj7r" }, "outputs": [], "source": [ "def getStateDataQualityScan(project_id, dataplex_region, data_quality_scan_job_name):\n", " \"\"\"Runs the data quality scan job and monitors until it completes\"\"\"\n", "\n", " # Gets the \"state\" of a scan\n", " url = f\"https://dataplex.googleapis.com/v1/{data_quality_scan_job_name}\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " return json_result[\"state\"]\n", " #== \"STATE_UNSPECIFIED\" or json_result[\"state\"] == \"RUNNING\" or json_result[\"state\"] == \"PENDING\":\n" ] }, { "cell_type": "markdown", "metadata": { "id": "Wv9dt4NRsdRk" }, "source": [ "#### getDataQualityScanConfiguration\n", "- Gets the scan configuration" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4w_ySRifscqo" }, "outputs": [], "source": [ "def getDataQualityScanConfiguration(project_id, dataplex_region, data_quality_scan_fqdn):\n", " \"\"\"Get the Data Quality Configuration as JSON\"\"\"\n", "\n", " # Gets the \"state\" of a scan\n", " url = f\"https://dataplex.googleapis.com/v1/{data_quality_scan_fqdn}?view=FULL\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " return json_result" ] }, { "cell_type": "markdown", "metadata": { "id": "6cWEZXtvGAZr" }, "source": [ "#### listScans\n", "- List all the Scan (Data Profile & Data Quality)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "152CdU2a078x" }, "outputs": [], "source": [ "def listScans(project_id, dataplex_region):\n", " \"\"\"Get the Data Quality Configuration as JSON\"\"\"\n", "\n", " # Gets the \"state\" of a scan\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " return json_result" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "TQeAKcjtpZUL" }, "outputs": [], "source": [ "def getScan(project_id, dataplex_region, scan_name):\n", " \"\"\"Get the Data Quality Configuration as JSON\"\"\"\n", "\n", " # Gets the \"state\" of a scan\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{scan_name}\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " return json_result" ] }, { "cell_type": "markdown", "metadata": { "id": "z7tolzxVGH3a" }, "source": [ "#### flatten_dict_values\n", "- Faltten the dictionaries to columns in Pandas Data Frame" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dpgCCXJ-VyQo" }, "outputs": [], "source": [ "def flatten_dict_values(df, column_name):\n", " \"\"\"\n", " Flattens dictionary values in a specified DataFrame column into individual columns.\n", "\n", " Args:\n", " df: The Pandas DataFrame.\n", " column_name: The name of the column containing dictionaries.\n", "\n", " Returns:\n", " A new Pandas DataFrame with the flattened columns, or the original DataFrame if\n", " the specified column does not exist or doesn't contain dictionaries.\n", " \"\"\"\n", " if column_name not in df.columns:\n", " print(f\"Column '{column_name}' not found in DataFrame.\")\n", " return df # Return original DataFrame\n", "\n", " # Check if column contains dictionaries\n", " if not all(isinstance(x, dict) if pd.notna(x) else True for x in df[column_name]): #check if all values in column are dictionaries\n", " print(f\"Column '{column_name}' does not contain dictionaries.\")\n", " return df # Return original DataFrame\n", "\n", " # Efficiently flatten the dictionaries using a list comprehension and pd.json_normalize\n", " flattened_data = pd.json_normalize(df[column_name])\n", "\n", " # Concatenate the flattened data with the original DataFrame\n", " df = df.drop(columns=column_name, errors='ignore') #drop original column\n", " df = pd.concat([df, flattened_data], axis=1)\n", "\n", " return df" ] }, { "cell_type": "markdown", "metadata": { "id": "QBrZFIK90j2P" }, "source": [ "#### getDataQualityScanRecommendations\n", "- Gets the scan Recommendations from Profiles" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cW8mHOGB02fg" }, "outputs": [], "source": [ "def getDataQualityScanRecommendationsByProfile(project_id, dataplex_region, data_profile_scan_name):\n", " \"\"\"Gets scan recommendations based on table profile\"\"\"\n", "\n", " # Gets scan recommendations based on table profile\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_profile_scan_name}:generateDataQualityRules\"\n", " json_result = restAPIHelper(url, \"POST\", None)\n", " return json_result" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5Oh37bHHyB35" }, "outputs": [], "source": [ "def getDataQualityScanRecommendationsByProfileJob(project_id, dataplex_region, data_profile_scan_name, data_profile_scan_job_name):\n", " \"\"\"Gets scan recommendations based on table profile\"\"\"\n", "\n", " # Gets scan recommendations based on table profile\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_profile_scan_name}/jobs/{data_profile_scan_job_name}:generateDataQualityRules\"\n", " json_result = restAPIHelper(url, \"POST\", None)\n", " return json_result" ] }, { "cell_type": "markdown", "metadata": { "id": "Kd51jAhcZBUm" }, "source": [ "#### updateBigQueryTableDataplexLabels\n", "- Patches the BigQuery table so that we associate the a Dataplex item with the BigQuery table so you see it in the UI\n", "- Returns nothing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "q0D8cTWCZBpM" }, "outputs": [], "source": [ "def updateBigQueryTableDataplexLabels(project_id, dataplex_region, dataplex_asset_type, dataplex_asset_scan_name, bigquery_dataset_name, bigquery_table_name):\n", " \"\"\"Sets the labels on the BigQuery table so users can see the data profile in the Console.\"\"\"\n", "\n", " # Patch BigQuery\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create\n", " print(\"Patching BigQuery Dataplex Labels\")\n", "\n", " url = f\"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}\"\n", "\n", " request_body = {}\n", " if dataplex_asset_type == \"DATA-PROFILE-SCAN\":\n", " request_body = {\n", " \"labels\" : {\n", " \"dataplex-dp-published-project\" : project_id,\n", " \"dataplex-dp-published-location\" : dataplex_region,\n", " \"dataplex-dp-published-scan\" : dataplex_asset_scan_name,\n", " }\n", " }\n", " elif dataplex_asset_type == \"DATA-INSIGHTS-SCAN\":\n", " request_body = {\n", " \"labels\" : {\n", " \"dataplex-data-documentation-project\" : project_id,\n", " \"dataplex-data-documentation-location\" : dataplex_region,\n", " \"dataplex-data-documentation-scan\" : dataplex_asset_scan_name,\n", " }\n", " }\n", " elif dataplex_asset_type == \"DATA-QUALITY-SCAN\":\n", " request_body = {\n", " \"labels\" : {\n", "\n", " \"dataplex-dq-published-project\" : project_id,\n", " \"dataplex-dq-published-location\" : dataplex_region,\n", " \"dataplex-dq-published-scan\" : dataplex_asset_scan_name,\n", " }\n", " }\n", " else:\n", " raise Exception(f\"Unknown dataplex_asset_type of {dataplex_asset_type}\")\n", "\n", " json_result = restAPIHelper(url, \"PATCH\", request_body)\n", " print(json_result)" ] }, { "cell_type": "markdown", "metadata": { "id": "y4wCmmHs2Oz2" }, "source": [ "#### deleteDataQualityScanConfiguration\n", "- Deletes a DataScan resource." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "EFl-tJoK2VjB" }, "outputs": [], "source": [ "def deleteDataQualityScanConfiguration(project_id, dataplex_region, data_quality_scan_job_name):\n", " \"\"\"Gets scan recommendations based on table profile\"\"\"\n", "\n", " # Gets scan recommendations based on table profile\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}\"\n", " json_result = restAPIHelper(url, \"DELETE\", None)\n", " return json_result" ] }, { "cell_type": "markdown", "metadata": { "id": "ZUD3PeH1IpBb" }, "source": [ "### <font color='#4285f4'>BQ Helper Methods</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "2mWH5PwCIyHq" }, "source": [ "#### isTableExists\n", "\n", "project_id, dataset_name, table_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "DR6lAQkJIpWK" }, "outputs": [], "source": [ "def isTableExists(project_id, dataset_name, table_name):\n", " import io\n", " import google.cloud.bigquery as bigquery\n", " import json\n", "\n", " try:\n", " client = bigquery.Client()\n", " dataset_ref = client.dataset(dataset_name, project=project_id)\n", " table_ref = dataset_ref.table(table_name)\n", " table = client.get_table(table_ref)\n", " return True\n", " except Exception as e:\n", " # print(f\"Error reading table schema: {e}\")\n", " # exc_type, exc_value, exc_traceback = sys.exc_info()\n", " # line_number = exc_traceback.tb_lineno\n", " # print(f\"An error occurred on line {line_number}: {e}\")\n", " return False" ] }, { "cell_type": "markdown", "metadata": { "id": "tT7BfQPGLuCa" }, "source": [ "#### RunQuery" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "_S9rxR5iLvxh" }, "outputs": [], "source": [ "def RunQuery(sql):\n", " import time\n", " from google.cloud import bigquery\n", " client = bigquery.Client()\n", "\n", " if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n", " df_result = client.query(sql).to_dataframe()\n", " return df_result\n", " else:\n", " job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n", " query_job = client.query(sql, job_config=job_config)\n", "\n", " # Check on the progress by getting the job's updated state.\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " while query_job.state != \"DONE\":\n", " time.sleep(2)\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " if query_job.error_result == None:\n", " return True\n", " else:\n", " raise Exception(query_job.error_result)" ] }, { "cell_type": "markdown", "metadata": { "id": "JWj6TlwpLej7" }, "source": [ "### <font color='#4285f4'>Create Dataset to hold Data Quality Results</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "eAS1h1hKLSj2" }, "outputs": [], "source": [ "# Create a new dataset to hold the SDP results (keep seperate from source tables)\n", "\n", "governed_data_scan_dataset_name = \"governed_data_scan_quality_results\"\n", "\n", "sql = f\"\"\"CREATE SCHEMA IF NOT EXISTS `{project_id}.{governed_data_scan_dataset_name}` OPTIONS(location=\"{bigquery_location}\")\"\"\"\n", "\n", "RunQuery(sql)\n", "\n", "dq_results_table_name = \"data_quality_metrics\"" ] }, { "cell_type": "markdown", "metadata": { "id": "QCFEfpdmEydA" }, "source": [ "### <font color='#4285f4'>Run Data Quality Scans - Algorithm</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "gZE_lzcr28UE" }, "source": [ "#### Create a custom SQL rule (to show an example)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "41fxuZ5gbXrd" }, "outputs": [], "source": [ "# Create some custom rules for the the tables order_details and sales in the curated dataset\n", "\n", "def custom_sql_rule(project_id, dataset_name, table_name):\n", " # We will create some rules that fail for demostration purposes\n", " # The columns here are common between the two tables, you would typically have rules per table, but here we are re-using.\n", "\n", " sql = \"\"\n", " if table_name == \"sales\":\n", " sql = f\"SELECT customer_id FROM `{project_id}.{dataset_name}.{table_name}` WHERE customer_id NOT IN (SELECT customer_id FROM `{project_id}.{dataset_name}.customer` WHERE customer_id > 5)\"\n", " else:\n", " # order_detail\n", " sql = f\"SELECT product_id FROM `{project_id}.{dataset_name}.{table_name}` WHERE product_id NOT IN (SELECT product_id FROM `{project_id}.{dataset_name}.product` WHERE product_id > 5)\"\n", "\n", " rules = [\n", " ####\n", " {\n", " \"dimension\": \"VALIDITY\",\n", " \"threshold\": 1,\n", " \"name\": \"sale-price-check\",\n", " \"description\": \"Sale Price Check\",\n", " \"rowConditionExpectation\": {\n", " \"sqlExpression\": f\"price >= (SELECT MAX(price) - 5 FROM `{project_id}.{dataset_name}.{table_name}`)\"\n", " }\n", " },\n", " ####\n", " { \"sqlAssertion\": {\"sqlStatement\": sql},\n", " \"dimension\": \"CONSISTENCY\",\n", " \"name\": \"orphan-customers\",\n", " \"description\": \"Customers sales without a customer record.\"\n", " },\n", " ####\n", " {\n", " \"dimension\": \"ACCURACY\",\n", " \"name\": \"zero-sold-quantity\",\n", " \"description\": \"Products sold with 0 quantity\",\n", " \"tableConditionExpectation\": {\n", " \"sqlExpression\": f\"(SELECT COUNT(*) CNT FROM `{project_id}.{dataset_name}.{table_name}` WHERE quantity = 0) > 0\"\n", " }\n", " },\n", " ####\n", " {\"nonNullExpectation\": {},\n", " \"column\": \"quantity\",\n", " \"dimension\": \"COMPLETENESS\",\n", " \"threshold\": 1\n", " },\n", " ####\n", " {\"rangeExpectation\":\n", " { \"minValue\": \"0\",\n", " \"maxValue\": \"1000\",\n", " \"strictMinEnabled\": True\n", " },\n", " \"column\": \"price\",\n", " \"ignoreNull\": True,\n", " \"dimension\": \"VALIDITY\",\n", " \"threshold\": 1,\n", " \"name\": \"price-range-lt-1000\",\n", " \"description\": \"Price should be less than $1000\"\n", " }\n", " ]\n", " return rules" ] }, { "cell_type": "markdown", "metadata": { "id": "jF_G-orA3CVJ" }, "source": [ "#### Get all the tables we want to scan for each dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vvbhOrV2oMSX" }, "outputs": [], "source": [ "# Get all the tables we want to scan for each dataset\n", "scans_to_perform = []\n", "dataset_list = [\"${bigquery_governed_data_raw_dataset}\",\"${bigquery_governed_data_enriched_dataset}\",\"${bigquery_governed_data_curated_dataset}\"]\n", "\n", "sql = \"\"\n", "for dataset_name in dataset_list:\n", " sql += f\"SELECT table_schema, table_name, table_type from `{dataset_name}.INFORMATION_SCHEMA.TABLES` UNION ALL \"\n", "\n", "# Remove training union all\n", "sql = sql.rstrip(\" UNION ALL \")\n", "\n", "result_df = RunQuery(sql)\n", "table_list = []\n", "\n", "# data_profile_scan_name: \"Field data_scan_id must contain only lowercase letters, numbers, and/or hyphens\n", "for index, row in result_df.iterrows():\n", " item = {\n", " \"project_id\": project_id,\n", " \"dataplex_region\": dataplex_region,\n", " \"data_profile_scan_name\": f\"{row['table_schema']}-{row['table_name']}-profile-scan\".lower().replace(\"_\",\"-\"),\n", " \"data_profile_display_name\": f\"{row['table_schema']}-{row['table_name']} profile scan\",\n", " \"data_quality_scan_name\": f\"{row['table_schema']}-{row['table_name']}-quality-scan\".lower().replace(\"_\",\"-\"),\n", " \"data_quality_display_name\": f\"{row['table_schema']}.{row['table_name']} AutoDQ scan\",\n", " \"data_quality_description\": f\"{row['table_schema']}.{row['table_name']} AutoDQ scan based upon recommended rules\",\n", " \"bigquery_dataset_name\": row['table_schema'],\n", " \"bigquery_table_name\": row['table_name'],\n", "\n", " # Used by below loop for processing\n", " \"data_quality_scan_state\": \"\",\n", " \"data_quality_scan_job_name\": \"\"\n", " }\n", " scans_to_perform.append(item)\n", "\n", " print(f\"item: {item}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "1SYywPMB3Gvo" }, "source": [ "#### Searches all the DataScans which contain profile scan, insights scans and data quality scans for a specific scan" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5d14Gjcfq0Cz" }, "outputs": [], "source": [ "def search_data_scans(data, name=None, state=None, type=None):\n", " \"\"\"Searches the dataScans list for entries matching the given criteria. More efficient version.\n", "\n", " Args:\n", " data: The dictionary containing the dataScans list.\n", " name: The name to search for (optional).\n", " state: The state to search for (optional).\n", " type: The type to search for (optional).\n", "\n", " Returns:\n", " A list of dictionaries that match the search criteria. Returns an empty list if no matches are found.\n", " \"\"\"\n", "\n", " data_scans = data.get(\"dataScans\", []) # Get the dataScans list, or empty list if missing\n", " results = []\n", "\n", " for scan in data_scans: # Loop ONLY over the dataScans list\n", " if (name is None or name in scan.get(\"name\", \"\")) and \\\n", " (state is None or scan.get(\"state\") == state) and \\\n", " (type is None or scan.get(\"type\") == type):\n", " results.append(scan)\n", "\n", " return results" ] }, { "cell_type": "markdown", "metadata": { "id": "hxDu3yfc3O8q" }, "source": [ "#### Create a list of scans we will potentially run" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "VIpjJRYeoMPS" }, "outputs": [], "source": [ "# For each table\n", "# 1. Check to see if a data profile scan exists (if not set the status to SKIP)\n", "# 2. Get the last data profile scan\n", "# 3. Get the recommended rules\n", "# 4. Append custom rules (for the curated invoice detail table)\n", "\n", "allDataScans = listScans(project_id, dataplex_region)\n", "for item in scans_to_perform:\n", " # Check to see if a data profile scan exists (if not set the status to SKIP)\n", " name = f\"projects/{project_id}/locations/{dataplex_region}/dataScans/{item['data_profile_scan_name']}\"\n", " #print(f\"Attempting to Match: {name}\")\n", " profile_scan = search_data_scans(allDataScans, name, \"ACTIVE\", \"DATA_PROFILE\")\n", " if profile_scan != []:\n", " print(f\"MATCH: {profile_scan[0]}\")\n", " try:\n", " recommended_rules = getDataQualityScanRecommendationsByProfile(project_id, dataplex_region, item['data_profile_scan_name'])\n", " item[\"data_quality_recommended_rules\"] = recommended_rules\n", " if item[\"bigquery_dataset_name\"] == \"${bigquery_governed_data_curated_dataset}\" and \\\n", " (item[\"bigquery_table_name\"] == \"order_detail\" or item[\"bigquery_table_name\"] == \"sales\"):\n", " print(\"Creating custom rules for ${bigquery_governed_data_curated_dataset}.invoice_detail\")\n", " custom_rules = custom_sql_rule(project_id, item[\"bigquery_dataset_name\"], item[\"bigquery_table_name\"])\n", " # Add the custom rules to the recommended rules\n", " recommended_rules[\"rule\"].extend(custom_rules)\n", " item[\"data_quality_recommended_rules\"] = recommended_rules\n", " print(f\"data_quality_recommended_rules: {item['data_quality_recommended_rules']}\")\n", " #print(f\"recommended_rules: {recommended_rules}\")\n", " print(f\"Scan to be created for: {item['data_quality_scan_name']}\")\n", " except Exception as e:\n", " print(f\"** No recommended rules for {item['data_profile_scan_name']} **\")\n", " item[\"data_quality_scan_state\"] = \"SKIP-RECOMMENDED-RULES\"\n", " item[\"data_quality_scan_job_name\"] = \"SKIP-RECOMMENDED-RULES\"\n", "\n", " else:\n", " print(f\"** No data profile scan for {item['data_profile_scan_name']} **\")\n", " item[\"data_quality_scan_state\"] = \"SKIP-NO-PROFILE-SCAN\"\n", "\n", "\n", " item[\"data_quality_scan_job_name\"] = \"SKIP-NO-PROFILE-SCAN\"" ] }, { "cell_type": "markdown", "metadata": { "id": "Y2E0WaUM3YIS" }, "source": [ "#### Standard notification and export of data quality results for all tables" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CiYMDYU3oMGa" }, "outputs": [], "source": [ "samplingPercent = 100\n", "postScanActions = {\n", " \"notificationReport\": {\n", " \"recipients\": {\n", " \"emails\": [\n", " notification_email\n", " ]\n", " },\n", " \"scoreThresholdTrigger\": {\n", " \"scoreThreshold\": 100\n", " },\n", " \"jobFailureTrigger\": {},\n", " \"jobEndTrigger\": {}\n", " },\n", " \"bigqueryExport\":\n", " {\"resultsTable\": f\"//bigquery.googleapis.com/projects/{project_id}/datasets/{governed_data_scan_dataset_name}/tables/{dq_results_table_name}\"}\n", " }" ] }, { "cell_type": "markdown", "metadata": { "id": "WtZi8LiUyoaO" }, "source": [ "#### Run the data quality scans (up to 5 concurrently) and update the BigQuery Table" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "w5irScBJoL2A" }, "outputs": [], "source": [ "# Run the scans (up to a certain concurrency level)\n", "numberOfScansToRunConcurrently = 5\n", "\n", "while True:\n", " # Count the number of scans that are running\n", " concurrentScanCount = 0\n", " for item in scans_to_perform:\n", " if item[\"data_quality_scan_state\"] == \"PENDING\" or \\\n", " item[\"data_quality_scan_state\"] == \"STATE_UNSPECIFIED\" or \\\n", " item[\"data_quality_scan_state\"] == \"RUNNING\" or \\\n", " item[\"data_quality_scan_state\"] == \"CANCELING\":\n", " # Update our count\n", " print(f\"Concurrent Scan Count: {item['bigquery_dataset_name']}.{item['bigquery_table_name']} -> {item['data_quality_scan_state']}\")\n", " concurrentScanCount += 1\n", " else:\n", " print(f\"Concurrent Scan Count: {item['bigquery_dataset_name']}.{item['bigquery_table_name']} -> {item['data_quality_scan_state']}\")\n", "\n", "\n", " print(f\"concurrentScanCount: {concurrentScanCount}\")\n", "\n", " # Start new scans under our concurrency count\n", " scansStarted = -1\n", " while concurrentScanCount < numberOfScansToRunConcurrently and scansStarted != 0:\n", " # Start new scans up to the concurrency limit\n", " scansStarted = 0\n", " for item in scans_to_perform:\n", " if concurrentScanCount < numberOfScansToRunConcurrently and \\\n", " item[\"data_quality_scan_state\"] == \"\":\n", " # start a new scan\n", " dataQualitySpec = {}\n", " dataQualitySpec[\"samplingPercent\"] = samplingPercent\n", " dataQualitySpec[\"postScanActions\"] = postScanActions\n", " dataQualitySpec[\"rules\"] = item[\"data_quality_recommended_rules\"][\"rule\"]\n", "\n", " # Create the data quality scan (if not exists)\n", " createDataQualityScan(project_id, dataplex_region, item[\"data_quality_scan_name\"],\n", " item[\"data_quality_display_name\"], item[\"data_quality_description\"],\n", " item[\"bigquery_dataset_name\"], item[\"bigquery_table_name\"],\n", " dataQualitySpec)\n", "\n", " started = False\n", " item[\"data_quality_scan_job_name\"] = \"\"\n", " while started == False:\n", " try:\n", " item[\"data_quality_scan_job_name\"] = startDataQualityScan(item[\"project_id\"], item[\"dataplex_region\"], item[\"data_quality_scan_name\"])\n", " item[\"data_quality_scan_state\"] = \"STATE_UNSPECIFIED\"\n", " started = True\n", " scansStarted += 1\n", " concurrentScanCount += 1\n", " except Exception as e:\n", " scan_full_name = f'projects/{item[\"project_id\"]}/locations/{item[\"dataplex_region\"]}/dataScans/{item[\"data_quality_scan_name\"]}'\n", " message = f\"Provided DataScan '{scan_full_name}' does not exist.\"\n", " print(message)\n", " if message in str(e):\n", " print(f\"Data scan is not available to start. Waiting...\")\n", " time.sleep(5)\n", " else:\n", " raise e # Re-raise the exception for other errors\n", "\n", "\n", " # Update the status for the scans that are processing\n", " for item in scans_to_perform:\n", " if item[\"data_quality_scan_state\"] == \"PENDING\" or \\\n", " item[\"data_quality_scan_state\"] == \"STATE_UNSPECIFIED\" or \\\n", " item[\"data_quality_scan_state\"] == \"RUNNING\" or \\\n", " item[\"data_quality_scan_state\"] == \"CANCELING\":\n", " # Get the latest state\n", " item[\"data_quality_scan_state\"] = getStateDataQualityScan(item[\"project_id\"], item[\"dataplex_region\"], item[\"data_quality_scan_job_name\"])\n", "\n", " if concurrentScanCount == 0:\n", " # nothing processing- exit\n", " break\n", " else:\n", " # wait for processing\n", " print(f\"concurrentScanCount: {concurrentScanCount}\")\n", " time.sleep(10)\n", "\n", "# Update the BigQuery labels so our scans show in the Console UI\n", "for item in scans_to_perform:\n", " if item[\"data_quality_scan_state\"] == \"SUCCEEDED\":\n", " # skip CANCELLED or FAILED states\n", " updateBigQueryTableDataplexLabels(item[\"project_id\"], item[\"dataplex_region\"],\n", " \"DATA-QUALITY-SCAN\", item[\"data_quality_scan_name\"],\n", " item[\"bigquery_dataset_name\"], item[\"bigquery_table_name\"])\n", "\n", " print(f\"Associated data quality scan for table {item['bigquery_dataset_name']}.{item['bigquery_table_name']} associated with BigQuery Console UI.\")\n" ] }, { "cell_type": "markdown", "metadata": { "id": "42IxhtRRrvR-" }, "source": [ "### <font color='#4285f4'>Clean Up</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6lF2Z7skFbvf" }, "outputs": [], "source": [ "# Placeholder (you would need to un-patch the BigQuery tables and delete the scans)\n", "print(f\"You can delete scans here: https://console.cloud.google.com/dataplex/govern/quality?project={project_id}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "ASQ2BPisXDA0" }, "source": [ "### <font color='#4285f4'>Reference Links</font>\n" ] }, { "cell_type": "markdown", "metadata": { "id": "rTY6xJdZ3ul8" }, "source": [ "- [REPLACE-ME](https://REPLACE-ME)" ] } ], "metadata": { "colab": { "collapsed_sections": [ "8rRxIQAxABNK", "S7zKuqTpABNL", "HMsUvoF4BP7Y", "m65vp54BUFRi", "UmyL-Rg4Dr_f", "sZ6m_wGrK0YG", "JbOjdSP1kN9T", "ILcRKf-zgsP5", "BVXA0swqCbLn", "g0nB-_E4DdDq", "Wv9dt4NRsdRk", "6cWEZXtvGAZr", "z7tolzxVGH3a", "y4wCmmHs2Oz2", "ZUD3PeH1IpBb", "42IxhtRRrvR-", "ASQ2BPisXDA0" ], "name": "02-Data-Quality", "private_outputs": true, "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }