deprecated-code/dags/sample-dataplex-run-data-quality.py (71 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Runs a data quality check against BigQuery # [START dag] from google.cloud import storage from datetime import datetime, timedelta import requests import sys import os import logging import airflow from airflow.operators import bash_operator from airflow.utils import trigger_rule from airflow.operators.python_operator import PythonOperator import google.auth import google.auth.transport.requests from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator import json default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } project_id = os.environ['ENV_PROJECT_ID'] taxi_dataset_id = os.environ['ENV_TAXI_DATASET_ID'] processed_bucket_name = os.environ['ENV_PROCESSED_BUCKET'] yaml_path = "gs://" + processed_bucket_name + "/dataplex/dataplex_data_quality_taxi.yaml" bigquery_region = os.environ['ENV_BIGQUERY_REGION'] taxi_dataset_id = os.environ['ENV_TAXI_DATASET_ID'] thelook_dataset_id = os.environ['ENV_THELOOK_DATASET_ID'] vpc_subnet_name = os.environ['ENV_DATAPROC_SERVERLESS_SUBNET_NAME'] dataplex_region = os.environ['ENV_DATAPLEX_REGION'] service_account_to_run_dataplex = "dataproc-service-account@" + project_id + ".iam.gserviceaccount.com" random_extension = os.environ['ENV_RANDOM_EXTENSION'] taxi_dataplex_lake_name = "taxi-data-lake-" + random_extension data_quality_dataset_id = "dataplex_data_quality" # NOTE: This is case senstive for some reason bigquery_region = bigquery_region.upper() params_list = { "project_id" : project_id, "taxi_dataset": taxi_dataset_id, "thelook_dataset": thelook_dataset_id, "yaml_path": yaml_path, "bigquery_region": bigquery_region, "thelook_dataset": thelook_dataset_id, "taxi_dataplex_lake_name": taxi_dataplex_lake_name, "vpc_subnet_name": vpc_subnet_name, "dataplex_region": dataplex_region, "service_account_to_run_dataplex": service_account_to_run_dataplex, "random_extension": random_extension } # Create the dataset to hold the data quality results # NOTE: This has to be in the same region as the BigQuery dataset we are performing our data quality checks with airflow.DAG('sample-dataplex-run-data-quality', default_args=default_args, start_date=datetime(2021, 1, 1), # Add the Composer "Data" directory which will hold the SQL/Bash scripts for deployment template_searchpath=['/home/airflow/gcs/data'], # Not scheduled, trigger only schedule_interval=None) as dag: # NOTE: The service account of the Composer worker node must have access to run these commands # Create the dataset for holding dataplex data quality results create_data_quality_dataset = BigQueryCreateEmptyDatasetOperator( task_id="create_dataset", location=bigquery_region, project_id=project_id, dataset_id=data_quality_dataset_id, exists_ok=True ) # Create a data quality task dataplex_data_quality = bash_operator.BashOperator( task_id='dataplex_data_quality', bash_command='bash_deploy_dataplex_data_quality.sh', params=params_list, dag=dag ) # DAG Graph create_data_quality_dataset >> dataplex_data_quality # [END dag]