cloud-composer/dags/sample-rideshare-hydrate-data.py (160 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Runs all the stored procedures necessary to hydrate the raw, enriched and curated zone of # the Rideshare Analytics Lakehouse # [START dag] from google.cloud import storage from datetime import datetime, timedelta import requests import sys import os import logging import airflow from airflow.utils import trigger_rule from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } project_id = os.environ['ENV_PROJECT_ID'] bigquery_region = os.environ['ENV_BIGQUERY_REGION'] sp_rideshare_lakehouse_raw_sp_create_raw_data="CALL `{}.rideshare_lakehouse_raw.sp_create_raw_data`();".format(project_id) sp_rideshare_lakehouse_raw_sp_create_biglake_object_table="CALL `{}.rideshare_lakehouse_raw.sp_create_biglake_object_table`();".format(project_id) sp_rideshare_lakehouse_raw_sp_create_streaming_view="CALL `{}.rideshare_lakehouse_raw.sp_create_streaming_view`();".format(project_id) sp_rideshare_lakehouse_raw_sproc_sp_create_biglake_tables="CALL `{}.rideshare_lakehouse_raw.sproc_sp_create_biglake_tables`();".format(project_id) sp_rideshare_lakehouse_enriched_sp_process_data="CALL `{}.rideshare_lakehouse_enriched.sp_process_data`();".format(project_id) sp_rideshare_lakehouse_enriched_sp_iceberg_spark_transformation="CALL `{}.rideshare_lakehouse_enriched.sp_iceberg_spark_transformation`();".format(project_id) sp_rideshare_lakehouse_enriched_sp_unstructured_data_analysis="CALL `{}.rideshare_lakehouse_enriched.sp_unstructured_data_analysis`();".format(project_id) sp_rideshare_lakehouse_enriched_sp_create_streaming_view="CALL `{}.rideshare_lakehouse_enriched.sp_create_streaming_view`();".format(project_id) sp_rideshare_lakehouse_curated_sp_process_data="CALL `{}.rideshare_lakehouse_curated.sp_process_data`();".format(project_id) sp_rideshare_lakehouse_curated_sp_create_streaming_view="CALL `{}.rideshare_lakehouse_curated.sp_create_streaming_view`();".format(project_id) sp_rideshare_lakehouse_curated_sp_create_looker_studio_view="CALL `{}.rideshare_lakehouse_curated.sp_create_looker_studio_view`();".format(project_id) sp_rideshare_lakehouse_curated_sp_create_website_realtime_dashboard="CALL `{}.rideshare_lakehouse_curated.sp_create_website_realtime_dashboard`();".format(project_id) sp_rideshare_lakehouse_curated_sp_model_training="CALL `{}.rideshare_lakehouse_curated.sp_model_training`();".format(project_id) with airflow.DAG('sample-rideshare-hydrate-data', default_args=default_args, start_date=datetime(2021, 1, 1), # Not scheduled, trigger only schedule_interval=None) as dag: rideshare_lakehouse_raw_sp_create_raw_data = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_raw_sp_create_raw_data", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_raw_sp_create_raw_data, "useLegacySql": False, } }) rideshare_lakehouse_raw_sp_create_biglake_object_table = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_raw_sp_create_biglake_object_table", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_raw_sp_create_biglake_object_table, "useLegacySql": False, } }) rideshare_lakehouse_raw_sp_create_streaming_view = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_raw_sp_create_streaming_view", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_raw_sp_create_streaming_view, "useLegacySql": False, } }) rideshare_lakehouse_raw_sproc_sp_create_biglake_tables = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_raw_sproc_sp_create_biglake_tables", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_raw_sproc_sp_create_biglake_tables, "useLegacySql": False, } }) rideshare_lakehouse_enriched_sp_process_data = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_enriched_sp_process_data", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_enriched_sp_process_data, "useLegacySql": False, } }) rideshare_lakehouse_enriched_sp_unstructured_data_analysis = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_enriched_sp_unstructured_data_analysis", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_enriched_sp_unstructured_data_analysis, "useLegacySql": False, } }) rideshare_lakehouse_enriched_sp_create_streaming_view = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_enriched_sp_create_streaming_view", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_enriched_sp_create_streaming_view, "useLegacySql": False, } }) rideshare_lakehouse_curated_sp_process_data = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_curated_sp_process_data", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_curated_sp_process_data, "useLegacySql": False, } }) rideshare_lakehouse_curated_sp_create_streaming_view = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_curated_sp_create_streaming_view", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_curated_sp_create_streaming_view, "useLegacySql": False, } }) rideshare_lakehouse_curated_sp_create_looker_studio_view = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_curated_sp_create_looker_studio_view", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_curated_sp_create_looker_studio_view, "useLegacySql": False, } }) rideshare_lakehouse_curated_sp_create_website_realtime_dashboard = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_curated_sp_create_website_realtime_dashboard", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_curated_sp_create_website_realtime_dashboard, "useLegacySql": False, } }) rideshare_lakehouse_curated_sp_model_training = BigQueryInsertJobOperator( task_id="rideshare_lakehouse_curated_sp_model_training", location=bigquery_region, configuration={ "query": { "query": sp_rideshare_lakehouse_curated_sp_model_training, "useLegacySql": False, } }) # BigSpark #rideshare_lakehouse_enriched_sp_iceberg_spark_transformation = BigQueryInsertJobOperator( # task_id="rideshare_lakehouse_enriched_sp_iceberg_spark_transformation", # location=bigquery_region, # configuration={ # "query": { # "query": sp_rideshare_lakehouse_enriched_sp_iceberg_spark_transformation, # "useLegacySql": False, # } # }) # Run Dataproc (until BigSpark is public) sample_rideshare_iceberg_serverless = TriggerDagRunOperator( task_id="sample_rideshare_iceberg_serverless", trigger_dag_id="sample-rideshare-iceberg-serverless", wait_for_completion=True ) # Process Iceberg using Dataproc Serverless Spark rideshare_lakehouse_raw_sp_create_raw_data >> rideshare_lakehouse_raw_sp_create_biglake_object_table >> \ rideshare_lakehouse_raw_sp_create_streaming_view >> rideshare_lakehouse_raw_sproc_sp_create_biglake_tables >> \ rideshare_lakehouse_enriched_sp_process_data >> sample_rideshare_iceberg_serverless >> \ rideshare_lakehouse_enriched_sp_unstructured_data_analysis >> \ rideshare_lakehouse_enriched_sp_create_streaming_view >> \ rideshare_lakehouse_curated_sp_process_data >> rideshare_lakehouse_curated_sp_create_streaming_view >> \ rideshare_lakehouse_curated_sp_create_looker_studio_view >> rideshare_lakehouse_curated_sp_create_website_realtime_dashboard >> \ rideshare_lakehouse_curated_sp_model_training # Process Iceberg using BigSpark (still in preview) #rideshare_lakehouse_raw_sp_create_raw_data >> rideshare_lakehouse_raw_sp_create_biglake_object_table >> \ # rideshare_lakehouse_raw_sp_create_streaming_view >> rideshare_lakehouse_raw_sproc_sp_create_biglake_tables >> \ # rideshare_lakehouse_enriched_sp_process_data >> rideshare_lakehouse_enriched_sp_iceberg_spark_transformation >> \ # rideshare_lakehouse_enriched_sp_unstructured_data_analysis >> \ # rideshare_lakehouse_enriched_sp_create_streaming_view >> \ # rideshare_lakehouse_curated_sp_process_data >> rideshare_lakehouse_curated_sp_create_streaming_view >> \ # rideshare_lakehouse_curated_sp_create_looker_studio_view >> rideshare_lakehouse_curated_sp_create_website_realtime_dashboard >> \ # rideshare_lakehouse_curated_sp_model_training # [END dag]