data-analytics-demos/bigquery-data-governance/colab-enterprise/06-Data-Discovery-Scan.ipynb (773 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "gXQ7R9BJ-L9v" }, "source": [ "### <font color='#4285f4'>Overview</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "EfMzqWL3-L9w" }, "source": [ "**Overview**: Creates a data discovery scan of a storage account the discovers 4 tables (CSV files) and creates BigLake tables over the CSV files.\n", "\n", "**Process Flow**:\n", "1. **Copy CSV files** to a separate folder within the \"scan\" GCS bucket.\n", "\n", "2. **Create a data discovery scan** for the bucket.\n", "\n", "3. **Specify the storage account** for the scan.\n", "\n", "4. **Specify BigLake table creation** and provide the necessary BigLake connection details.\n", "\n", "5. **Pause for a few seconds** while the scan registers (avoid starting the scan too quickly).\n", "\n", "6. **Start the data discovery scan.**\n", "\n", "7. **Wait for the scan to complete.**\n", "\n", "8. **Review the newly created BigQuery dataset.**\n", "\n", "9. **(Optional) Delete the scan.**\n", "\n", "Notes:\n", "* This notebook runs the scans manually. Typically, you should schedule a scan on a schedule and not worry about processing.\n", "\n", "Cost:\n", "* Approximate cost: Less than a dollar\n", "\n", "Author:\n", "* Adam Paternostro" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wUdnbAGL-L9w" }, "outputs": [], "source": [ "# Architecture Diagram\n", "from IPython.display import Image\n", "Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Data-Discovery.png', width=1200)" ] }, { "cell_type": "markdown", "metadata": { "id": "P6O1UluF-L9x" }, "source": [ "### <font color='#4285f4'>Video Walkthrough</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "3yqa3U5A-L9x" }, "source": [ "[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Discovery-Scan.mp4)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "zO--hQKn-L9x" }, "outputs": [], "source": [ "from IPython.display import HTML\n", "\n", "HTML(\"\"\"\n", "<video width=\"800\" height=\"600\" controls>\n", " <source src=\"https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Data-Discovery-Scan.mp4\" type=\"video/mp4\">\n", " Your browser does not support the video tag.\n", "</video>\n", "\"\"\")" ] }, { "cell_type": "markdown", "metadata": { "id": "HMsUvoF4BP7Y" }, "source": [ "### <font color='#4285f4'>License</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "jQgQkbOvj55d" }, "source": [ "```\n", "# Copyright 2024 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "```" ] }, { "cell_type": "markdown", "metadata": { "id": "m65vp54BUFRi" }, "source": [ "### <font color='#4285f4'>Pip installs</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5MaWM6H5i6rX" }, "outputs": [], "source": [ "# PIP Installs (if necessary)\n", "import sys\n", "\n", "# !{sys.executable} -m pip install REPLACE-ME" ] }, { "cell_type": "markdown", "metadata": { "id": "UmyL-Rg4Dr_f" }, "source": [ "### <font color='#4285f4'>Initialize</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xOYsEVSXp6IP" }, "outputs": [], "source": [ "from PIL import Image\n", "from IPython.display import HTML\n", "import IPython.display\n", "import google.auth\n", "import requests\n", "import json\n", "import uuid\n", "import base64\n", "import os\n", "import cv2\n", "import random\n", "import time\n", "import datetime\n", "import base64\n", "import random\n", "\n", "import logging\n", "from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wMlHl3bnkFPZ" }, "outputs": [], "source": [ "# Set these (run this cell to verify the output)\n", "\n", "bigquery_location = \"${bigquery_location}\"\n", "region = \"${dataplex_region}\"\n", "location = \"${dataplex_region}\"\n", "scan_bucket_name = \"${governed_data_scan_bucket}\"\n", "\n", "# Get the current date and time\n", "now = datetime.datetime.now()\n", "\n", "# Format the date and time as desired\n", "formatted_date = now.strftime(\"%Y-%m-%d-%H-%M\")\n", "\n", "# Get some values using gcloud\n", "project_id = !(gcloud config get-value project)\n", "user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n", "\n", "if len(project_id) != 1:\n", " raise RuntimeError(f\"project_id is not set: {project_id}\")\n", "project_id = project_id[0]\n", "\n", "if len(user) != 1:\n", " raise RuntimeError(f\"user is not set: {user}\")\n", "user = user[0]\n", "\n", "print(f\"project_id = {project_id}\")\n", "print(f\"user = {user}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "sZ6m_wGrK0YG" }, "source": [ "### <font color='#4285f4'>Helper Methods</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "JbOjdSP1kN9T" }, "source": [ "#### restAPIHelper\n", "Calls the Google Cloud REST API using the current users credentials." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "40wlwnY4kM11" }, "outputs": [], "source": [ "def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n", " \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n", "\n", " import requests\n", " import google.auth\n", " import json\n", "\n", " # Get an access token based upon the current user\n", " creds, project = google.auth.default()\n", " auth_req = google.auth.transport.requests.Request()\n", " creds.refresh(auth_req)\n", " access_token=creds.token\n", "\n", " headers = {\n", " \"Content-Type\" : \"application/json\",\n", " \"Authorization\" : \"Bearer \" + access_token\n", " }\n", "\n", " if http_verb == \"GET\":\n", " response = requests.get(url, headers=headers)\n", " elif http_verb == \"POST\":\n", " response = requests.post(url, json=request_body, headers=headers)\n", " elif http_verb == \"PUT\":\n", " response = requests.put(url, json=request_body, headers=headers)\n", " elif http_verb == \"PATCH\":\n", " response = requests.patch(url, json=request_body, headers=headers)\n", " elif http_verb == \"DELETE\":\n", " response = requests.delete(url, headers=headers)\n", " else:\n", " raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n", "\n", " if response.status_code == 200:\n", " return json.loads(response.content)\n", " #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n", " else:\n", " error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n", " raise RuntimeError(error)" ] }, { "cell_type": "markdown", "metadata": { "id": "bI-KJELZ1jgt" }, "source": [ "#### Helper Functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pmnCwYvA1kZv" }, "outputs": [], "source": [ "def RunQuery(sql):\n", " import time\n", " from google.cloud import bigquery\n", " client = bigquery.Client()\n", "\n", " if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n", " df_result = client.query(sql).to_dataframe()\n", " return df_result\n", " else:\n", " job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n", " query_job = client.query(sql, job_config=job_config)\n", "\n", " # Check on the progress by getting the job's updated state.\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " while query_job.state != \"DONE\":\n", " time.sleep(2)\n", " query_job = client.get_job(\n", " query_job.job_id, location=query_job.location\n", " )\n", " print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n", "\n", " if query_job.error_result == None:\n", " return True\n", " else:\n", " raise Exception(query_job.error_result)" ] }, { "cell_type": "markdown", "metadata": { "id": "EYRHDPdVKBzd" }, "source": [ "### <font color='#4285f4'>Copy Data to Storage for Scan</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PJsTtUbD5SsQ" }, "outputs": [], "source": [ "# Copy our data (CSV files). We want the files in our local bucket with local location.\n", "# For CSV files (or any other file type), you want the folder to contain all the same schema (or table) of data\n", "# You would not want to put people.csv in the same folders as cars.csv\n", "\n", "source_file = \"gs://data-analytics-golden-demo/cymbal-consumer-finance/ccf_csv_tables_customers.csv\"\n", "dest_file = f\"gs://{scan_bucket_name}/cymbal-consumer-finance/customers/customers.csv\"\n", "print(f\"Copying data from {source_file} to {dest_file}\")\n", "!gsutil cp {source_file} {dest_file}\n", "print(\"Customer is complete\")\n", "\n", "source_file = \"gs://data-analytics-golden-demo/cymbal-consumer-finance/ccf_csv_tables_loan_applications.csv\"\n", "dest_file = f\"gs://{scan_bucket_name}/cymbal-consumer-finance/loan_applications/loan_applications.csv\"\n", "print(f\"Copying data from {source_file} to {dest_file}\")\n", "!gsutil cp {source_file} {dest_file}\n", "print(\"Customer is complete\")\n", "\n", "source_file = \"gs://data-analytics-golden-demo/cymbal-consumer-finance/ccf_csv_tables_loan_repayments.csv\"\n", "dest_file = f\"gs://{scan_bucket_name}/cymbal-consumer-finance/loan_repayments/loan_repayments.csv\"\n", "print(f\"Copying data from {source_file} to {dest_file}\")\n", "!gsutil cp {source_file} {dest_file}\n", "print(\"Customer is complete\")\n", "\n", "source_file = \"gs://data-analytics-golden-demo/cymbal-consumer-finance/ccf_csv_tables_marketing_costs.csv\"\n", "dest_file = f\"gs://{scan_bucket_name}/cymbal-consumer-finance/marketing_costs/marketing_costs.csv\"\n", "print(f\"Copying data from {source_file} to {dest_file}\")\n", "!gsutil cp {source_file} {dest_file}\n", "print(\"Customer is complete\")\n", "\n", "print(f\"To view the files: https://console.cloud.google.com/storage/browser/{scan_bucket_name}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "PUPlSXZRFOlR" }, "source": [ "### <font color='#4285f4'>Data Discovery Scan - Helper Methods</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "9dK1HHsmFKRw" }, "source": [ "#### existsDataDiscoveryScan\n", "- Tests to see if a Data Discovert Scan exists\n", "- Returns True/False" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "U4A_7n4SEPNO" }, "outputs": [], "source": [ "def existsDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name):\n", " \"\"\"Test to see if a scan exists.\"\"\"\n", "\n", " # Gather existing data scans\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/list\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans\"\n", "\n", " # Gather existing data scans\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " print(f\"createDataDocumentScan (GET) json_result: {json_result}\")\n", "\n", " # Test to see if data scan exists, if so return\n", " if \"dataScans\" in json_result:\n", " for item in json_result[\"dataScans\"]:\n", " print(f\"Scan names: {item['name']}\")\n", " if item[\"name\"] == f\"projects/{project_id}/locations/{dataplex_region}/dataScans/{data_discovery_scan_name}\":\n", " print(f\"Data Document Scan {data_discovery_scan_name} already exists\")\n", " return True\n", "\n", " return False" ] }, { "cell_type": "markdown", "metadata": { "id": "UjAdTIgGFEdv" }, "source": [ "#### createDataDiscoveryScan\n", "- Creates a discovery scan, but does not run it\n", "- If the scan exists, the does nothing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5YDEwqaQAKBx" }, "outputs": [], "source": [ "def createDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name, data_discovery_display_name, resource, biglake_connection_name):\n", " \"\"\"Creates the data discovery scan.\"\"\"\n", "\n", " if existsDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name) == False:\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create\n", " print(\"Creating Data Discovery Scan\")\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans?dataScanId={data_discovery_scan_name}\"\n", "\n", " request_body = {\n", " \"displayName\": data_discovery_display_name,\n", " \"type\": \"DATA_DISCOVERY\",\n", " \"data\":{\n", " \"resource\": resource\n", " },\n", " \"dataDiscoverySpec\": {\n", " \"storageConfig\": {\n", " \"csvOptions\":\n", " {\n", " \"delimiter\":\",\",\n", " \"headerRows\":1\n", " }\n", " # \"includePatterns\": includePatterns # We are just doing one for the demo\n", " },\n", " \"bigqueryPublishingConfig\": {\n", " \"connection\": biglake_connection_name,\n", " \"tableType\": \"BIGLAKE\"\n", " }\n", " }\n", " }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", "\n", " name = json_result[\"metadata\"][\"target\"]\n", " print(f\"Data Discovery Scan created: {name}\")\n", " else:\n", " print(f\"Data Discovery Scan exists: projects/{project_id}/locations/{dataplex_region}/dataScans/{data_discovery_scan_name}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "W7J_T8H3FVq2" }, "source": [ "#### startDataDiscoveryScan\n", "- Starts a data discovery scan (async)\n", "- Returns the \"job name\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "OU4WhqdnFcUq" }, "outputs": [], "source": [ "def startDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name):\n", " \"\"\"Starts the scan\"\"\"\n", "\n", " # Create a new scan\n", " # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/run\n", " print(\"Running Data Discovery Scan\")\n", "\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_discovery_scan_name}:run\"\n", "\n", " request_body = { }\n", "\n", " json_result = restAPIHelper(url, \"POST\", request_body)\n", " job_name = json_result[\"job\"][\"name\"]\n", " job_state = json_result[\"job\"][\"state\"]\n", " print(f\"Document Data Scan Run created: {job_name} - State: {job_state}\")\n", "\n", " return job_name\n" ] }, { "cell_type": "markdown", "metadata": { "id": "t4LEftacFizp" }, "source": [ "#### getStateDataDiscoveryScan\n", "- Gets the state of a scan (to see if it is done)\n", "- Returns the \"state\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "kpwLtMFZFjua" }, "outputs": [], "source": [ "def getStateDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_job_name):\n", " \"\"\"Gets the status for the scan job\"\"\"\n", "\n", " # Gets the \"state\" of a scan\n", " url = f\"https://dataplex.googleapis.com/v1/{data_discovery_scan_job_name}\"\n", " json_result = restAPIHelper(url, \"GET\", None)\n", " return json_result[\"state\"]" ] }, { "cell_type": "markdown", "metadata": { "id": "Cn8ltL6-rxqi" }, "source": [ "#### deleteDataDiscoveryScan\n", "- Deletes the scan if it exists\n", "- Returns nothing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "MzNiSJ2fryEK" }, "outputs": [], "source": [ "def deleteDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name):\n", " \"\"\"Deletes the scan\"\"\"\n", "\n", " if existsDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name) == True:\n", " # Deletes a scan\n", " url = f\"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_discovery_scan_name}\"\n", " json_result = restAPIHelper(url, \"DELETE\", None)\n", " print(f\"Scan {data_discovery_scan_name} deleted.\")\n", " else:\n", " print(f\"Scan {data_discovery_scan_name} does not exist to delete.\")" ] }, { "cell_type": "markdown", "metadata": { "id": "PwQHYZ-aFv7j" }, "source": [ "### <font color='#4285f4'>Run Data Discovery Scan (for a bucket)</font>" ] }, { "cell_type": "markdown", "metadata": { "id": "MKIRbIHHt1Dm" }, "source": [ "- Creates a new scan\n", "- Starts the scan (after a delay)\n", "- Monitors the scans progress" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "jS8dLcDnBy44" }, "outputs": [], "source": [ "dataplex_region = location\n", "data_discovery_scan_name = \"data-discovery-scan-01\"\n", "data_discovery_display_name = \"Data Discovery Scan (01)\"\n", "\n", "# We will use the dataplex region here (not US multi-region)\n", "resource = f\"//storage.googleapis.com/projects/{project_id}/buckets/{scan_bucket_name}\"\n", "biglake_connection_name = f\"projects/{project_id}/locations/{region}/connections/biglake-connection-dataplex\"\n", "#includePatterns = [ f\"gs://{scan_bucket_name}/table-1-etc.../*.csv\" ]\n", "\n", "print(f\"Creating scan of bucket(s): {scan_bucket_name}\")\n", "print()\n", "\n", "createDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name, data_discovery_display_name,\n", " resource, biglake_connection_name)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "-RT_3r91p2ve" }, "outputs": [], "source": [ "# It can take a few seconds for the scan to register\n", "time.sleep(20)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "fqzWbIz1f0Yj" }, "outputs": [], "source": [ "# Start the scan\n", "data_discovery_scan_job_name = startDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name)\n", "print(f\"data_discovery_scan_job_name: {data_discovery_scan_job_name}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Vb1D74b_gE-W" }, "outputs": [], "source": [ "# Monitor the scan\n", "data_discovery_scan_state = getStateDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_job_name)\n", "print(f\"data_discovery_scan_state: {data_discovery_scan_state}\")\n", "\n", "while data_discovery_scan_state == \"PENDING\" or \\\n", " data_discovery_scan_state == \"STATE_UNSPECIFIED\" or \\\n", " data_discovery_scan_state == \"RUNNING\" or \\\n", " data_discovery_scan_state == \"CANCELING\":\n", " time.sleep(10)\n", " data_discovery_scan_state = getStateDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_job_name)\n", " print(f\"data_discovery_scan_state: {data_discovery_scan_state}\")\n", "\n", "print(\"Discovery Scan complete. You should see a new BigQuery dataset.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"You can view/deletes scans here: https://console.cloud.google.com/bigquery/governance/catalog-management/cloud-storage-discovery?project={project_id}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "42IxhtRRrvR-" }, "source": [ "### <font color='#4285f4'>Clean Up</font>" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6lF2Z7skFbvf" }, "outputs": [], "source": [ "# Delete the scan\n", "\n", "user_input = input(f\"Do you want to delete your scan {data_discovery_scan_name} (Y/n)?\")\n", "if user_input == \"Y\":\n", " print(\"This will not delete the dataset created by the scan.\")\n", " deleteDataDiscoveryScan(project_id, dataplex_region, data_discovery_scan_name)" ] }, { "cell_type": "markdown", "metadata": { "id": "ASQ2BPisXDA0" }, "source": [ "### <font color='#4285f4'>Reference Links</font>\n" ] }, { "cell_type": "markdown", "metadata": { "id": "rTY6xJdZ3ul8" }, "source": [ "- [REPLACE-ME](https://REPLACE-ME)" ] } ], "metadata": { "colab": { "collapsed_sections": [ "gXQ7R9BJ-L9v", "P6O1UluF-L9x", "HMsUvoF4BP7Y", "m65vp54BUFRi", "sZ6m_wGrK0YG", "JbOjdSP1kN9T", "bI-KJELZ1jgt", "PUPlSXZRFOlR", "9dK1HHsmFKRw", "UjAdTIgGFEdv", "W7J_T8H3FVq2", "t4LEftacFizp", "42IxhtRRrvR-", "ASQ2BPisXDA0" ], "name": "Template", "private_outputs": true, "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }