cloud-composer/dags/run-all-dags.py (104 lines of code) (raw):
####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
# Author: Adam Paternostro
# Summary: Deploys Public Previiew features that require allowlisting
# This is not part of Step 3 since items in this DAG will fail to deploy if your
# project does not have a particular feature enabled.
# [START dag]
from datetime import datetime, timedelta
import sys
import os
import logging
import airflow
from airflow.utils import trigger_rule
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
project_id = os.environ['ENV_PROJECT_ID']
bigquery_region = os.environ['ENV_BIGQUERY_REGION']
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': None,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'dagrun_timeout' : timedelta(minutes=60),
}
sp_datastream_cdc_data="CALL `{}.taxi_dataset.sp_create_datastream_cdc_data`();".format(project_id)
step_01_trigger_dag_id = "step-01-taxi-data-download"
step_02_trigger_dag_id = "step-02-taxi-data-processing"
# Quick copy (comment this out to actually process the data from source)
step_01_trigger_dag_id = "step-01-taxi-data-download-quick-copy"
step_02_trigger_dag_id = "step-02-taxi-data-processing-quick-copy"
with airflow.DAG('run-all-dags',
default_args=default_args,
start_date=datetime(2021, 1, 1),
# Add the Composer "Data" directory which will hold the SQL scripts for deployment
template_searchpath=['/home/airflow/gcs/data'],
# Not scheduled, trigger only
schedule_interval='@once') as dag:
step_01_taxi_data_download = TriggerDagRunOperator(
task_id="step_01_taxi_data_download",
trigger_dag_id=step_01_trigger_dag_id,
wait_for_completion=True
)
step_02_taxi_data_processing = TriggerDagRunOperator(
task_id="step_02_taxi_data_processing",
trigger_dag_id=step_02_trigger_dag_id,
wait_for_completion=True
)
step_03_hydrate_tables = TriggerDagRunOperator(
task_id="step_03_hydrate_tables",
trigger_dag_id="step-03-hydrate-tables",
wait_for_completion=True
)
# Start the streaming job so we have some seed data
# This job will be stopped after 4 hours (by the stop job)
sample_dataflow_start_streaming_job = TriggerDagRunOperator(
task_id="sample_dataflow_start_streaming_job",
trigger_dag_id="sample-dataflow-start-streaming-job",
wait_for_completion=True
)
# Download 250+ images for the object table
sample_rideshare_download_images = TriggerDagRunOperator(
task_id="sample_rideshare_download_images",
trigger_dag_id="sample-rideshare-download-images",
wait_for_completion=True
)
# NO LONGER USED, please use the Cloud Run website
# Deploy website to App Engine
#sample_rideshare_website = TriggerDagRunOperator(
# task_id="sample_rideshare_website",
# trigger_dag_id="sample-rideshare-website",
# wait_for_completion=True
#)
# Run all stored procedures in the raw, enriched and curated zone
sample_rideshare_hydrate_data = TriggerDagRunOperator(
task_id="sample_rideshare_hydrate_data",
trigger_dag_id="sample-rideshare-hydrate-data",
wait_for_completion=True
)
# Download object table seed data and ML models
sample_seed_unstructured_data = TriggerDagRunOperator(
task_id="sample_seed_unstructured_data",
trigger_dag_id="sample-seed-unstructured-data",
wait_for_completion=True
)
sp_datastream_cdc_data = BigQueryInsertJobOperator(
task_id="sp_datastream_cdc_data",
location=bigquery_region,
configuration={
"query": {
"query": sp_datastream_cdc_data,
"useLegacySql": False,
}
})
sample_rideshare_llm_hydrate_data = TriggerDagRunOperator(
task_id="sample_rideshare_llm_hydrate_data",
trigger_dag_id="sample-rideshare-llm-hydrate-data",
wait_for_completion=True
)
sample_dataplex_dataprofile_ridehshare_llm = TriggerDagRunOperator(
task_id="sample_dataplex_dataprofile_ridehshare_llm",
trigger_dag_id="sample-dataplex-dataprofile-ridehshare-llm",
wait_for_completion=True
)
sample_dataplex_dataprofile_taxi = TriggerDagRunOperator(
task_id="sample_dataplex_dataprofile_taxi",
trigger_dag_id="sample-dataplex-dataprofile-taxi",
wait_for_completion=True
)
sample_dataplex_dataprofile_thelook = TriggerDagRunOperator(
task_id="sample_dataplex_dataprofile_thelook",
trigger_dag_id="sample-dataplex-dataprofile-thelook",
wait_for_completion=True
)
sample_dataplex_dataprofile_rideshare_lakehouse = TriggerDagRunOperator(
task_id="sample_dataplex_dataprofile_rideshare_lakehouse",
trigger_dag_id="sample-dataplex-dataprofile-rideshare-lakehouse",
wait_for_completion=True
)
# DAG Graph
step_01_taxi_data_download >> [step_02_taxi_data_processing, sample_seed_unstructured_data, sample_rideshare_download_images]
step_02_taxi_data_processing >> [step_03_hydrate_tables]
step_03_hydrate_tables >> [sample_dataplex_dataprofile_taxi, sample_dataplex_dataprofile_thelook, sample_dataflow_start_streaming_job, sp_datastream_cdc_data, sample_rideshare_hydrate_data]
sample_rideshare_hydrate_data >> sample_dataplex_dataprofile_rideshare_lakehouse
sample_rideshare_llm_hydrate_data >> sample_dataplex_dataprofile_ridehshare_llm
# [END dag]