datascan/airflow/dp_delete_runSync_dag.py (44 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Airflow dag to create, run and evaluate data profile scan""" import pendulum from airflow import DAG from google.cloud import dataplex_v1 from airflow.providers.google.cloud.operators.dataplex import DataplexGetDataProfileScanOperator from airflow.providers.google.cloud.operators.dataplex import DataplexDeleteDataProfileScanOperator from datetime import timedelta, datetime """replace project id and region with user project and the region respectively""" PROJECT_ID = "test-project" REGION = "us-central1" DATA_SCAN_ID = "airflow-data-profile-scan" """replace EXPORT_TABLE for your setup""" # Table where datascan job results should be exported to. EXPORT_TABLE = "//bigquery.googleapis.com/projects/test-project/datasets/test_dataset/tables/results_table" EXAMPLE_DATA_SCAN = dataplex_v1.DataScan() EXAMPLE_DATA_SCAN.data.resource = ( f"//bigquery.googleapis.com/projects/bigquery-public-data/datasets/austin_bikeshare/tables/bikeshare_stations" ) EXAMPLE_DATA_SCAN.data_profile_spec = {} default_args = { 'start_date': pendulum.today('UTC').add(days=0), 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'delete_runSync', default_args=default_args, description='Test custom dataplex data profile scan get_data_scan-> delete flow', schedule='10 10 * * *', start_date=datetime.now(), max_active_runs=2, catchup=False, dagrun_timeout=timedelta(minutes=20) ) get_data_scan = DataplexGetDataProfileScanOperator( task_id="get_data_scan", project_id=PROJECT_ID, region=REGION, dag=dag, data_scan_id=DATA_SCAN_ID) delete_data_scan = DataplexDeleteDataProfileScanOperator( task_id="delete_data_scan", project_id=PROJECT_ID, region=REGION, dag=dag, data_scan_id=DATA_SCAN_ID ) get_data_scan >> delete_data_scan