cloud-composer/dags/run-all-dags.py (104 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Deploys Public Previiew features that require allowlisting # This is not part of Step 3 since items in this DAG will fail to deploy if your # project does not have a particular feature enabled. # [START dag] from datetime import datetime, timedelta import sys import os import logging import airflow from airflow.utils import trigger_rule from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator project_id = os.environ['ENV_PROJECT_ID'] bigquery_region = os.environ['ENV_BIGQUERY_REGION'] default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } sp_datastream_cdc_data="CALL `{}.taxi_dataset.sp_create_datastream_cdc_data`();".format(project_id) step_01_trigger_dag_id = "step-01-taxi-data-download" step_02_trigger_dag_id = "step-02-taxi-data-processing" # Quick copy (comment this out to actually process the data from source) step_01_trigger_dag_id = "step-01-taxi-data-download-quick-copy" step_02_trigger_dag_id = "step-02-taxi-data-processing-quick-copy" with airflow.DAG('run-all-dags', default_args=default_args, start_date=datetime(2021, 1, 1), # Add the Composer "Data" directory which will hold the SQL scripts for deployment template_searchpath=['/home/airflow/gcs/data'], # Not scheduled, trigger only schedule_interval='@once') as dag: step_01_taxi_data_download = TriggerDagRunOperator( task_id="step_01_taxi_data_download", trigger_dag_id=step_01_trigger_dag_id, wait_for_completion=True ) step_02_taxi_data_processing = TriggerDagRunOperator( task_id="step_02_taxi_data_processing", trigger_dag_id=step_02_trigger_dag_id, wait_for_completion=True ) step_03_hydrate_tables = TriggerDagRunOperator( task_id="step_03_hydrate_tables", trigger_dag_id="step-03-hydrate-tables", wait_for_completion=True ) # Start the streaming job so we have some seed data # This job will be stopped after 4 hours (by the stop job) sample_dataflow_start_streaming_job = TriggerDagRunOperator( task_id="sample_dataflow_start_streaming_job", trigger_dag_id="sample-dataflow-start-streaming-job", wait_for_completion=True ) # Download 250+ images for the object table sample_rideshare_download_images = TriggerDagRunOperator( task_id="sample_rideshare_download_images", trigger_dag_id="sample-rideshare-download-images", wait_for_completion=True ) # NO LONGER USED, please use the Cloud Run website # Deploy website to App Engine #sample_rideshare_website = TriggerDagRunOperator( # task_id="sample_rideshare_website", # trigger_dag_id="sample-rideshare-website", # wait_for_completion=True #) # Run all stored procedures in the raw, enriched and curated zone sample_rideshare_hydrate_data = TriggerDagRunOperator( task_id="sample_rideshare_hydrate_data", trigger_dag_id="sample-rideshare-hydrate-data", wait_for_completion=True ) # Download object table seed data and ML models sample_seed_unstructured_data = TriggerDagRunOperator( task_id="sample_seed_unstructured_data", trigger_dag_id="sample-seed-unstructured-data", wait_for_completion=True ) sp_datastream_cdc_data = BigQueryInsertJobOperator( task_id="sp_datastream_cdc_data", location=bigquery_region, configuration={ "query": { "query": sp_datastream_cdc_data, "useLegacySql": False, } }) sample_rideshare_llm_hydrate_data = TriggerDagRunOperator( task_id="sample_rideshare_llm_hydrate_data", trigger_dag_id="sample-rideshare-llm-hydrate-data", wait_for_completion=True ) sample_dataplex_dataprofile_ridehshare_llm = TriggerDagRunOperator( task_id="sample_dataplex_dataprofile_ridehshare_llm", trigger_dag_id="sample-dataplex-dataprofile-ridehshare-llm", wait_for_completion=True ) sample_dataplex_dataprofile_taxi = TriggerDagRunOperator( task_id="sample_dataplex_dataprofile_taxi", trigger_dag_id="sample-dataplex-dataprofile-taxi", wait_for_completion=True ) sample_dataplex_dataprofile_thelook = TriggerDagRunOperator( task_id="sample_dataplex_dataprofile_thelook", trigger_dag_id="sample-dataplex-dataprofile-thelook", wait_for_completion=True ) sample_dataplex_dataprofile_rideshare_lakehouse = TriggerDagRunOperator( task_id="sample_dataplex_dataprofile_rideshare_lakehouse", trigger_dag_id="sample-dataplex-dataprofile-rideshare-lakehouse", wait_for_completion=True ) # DAG Graph step_01_taxi_data_download >> [step_02_taxi_data_processing, sample_seed_unstructured_data, sample_rideshare_download_images] step_02_taxi_data_processing >> [step_03_hydrate_tables] step_03_hydrate_tables >> [sample_dataplex_dataprofile_taxi, sample_dataplex_dataprofile_thelook, sample_dataflow_start_streaming_job, sp_datastream_cdc_data, sample_rideshare_hydrate_data] sample_rideshare_hydrate_data >> sample_dataplex_dataprofile_rideshare_lakehouse sample_rideshare_llm_hydrate_data >> sample_dataplex_dataprofile_ridehshare_llm # [END dag]