cloud-composer/dags/step-03-hydrate-tables.py (77 lines of code) (raw):
####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
# Author: Adam Paternostro
# Summary: Processes the downloaded Taxi data in the bucket to Parquet, CSV, JSON
# [START dag]
from google.cloud import storage
from datetime import datetime, timedelta
import requests
import sys
import os
import logging
import airflow
from airflow.utils import trigger_rule
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': None,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'dagrun_timeout' : timedelta(minutes=60),
}
project_id = os.environ['ENV_PROJECT_ID']
bigquery_region = os.environ['ENV_BIGQUERY_REGION']
sql_taxi_external_tables="CALL `{}.taxi_dataset.sp_create_taxi_external_tables`();".format(project_id)
sql_taxi_internal_tables="CALL `{}.taxi_dataset.sp_create_taxi_internal_tables`();".format(project_id)
sql_create_product_deliveries="CALL `{}.thelook_ecommerce.create_product_deliveries`();".format(project_id)
sql_create_thelook_tables="CALL `{}.thelook_ecommerce.create_thelook_tables`('{}');".format(project_id,bigquery_region)
sql_taxi_biglake_tables="CALL `{}.taxi_dataset.sp_create_taxi_biglake_tables`();".format(project_id)
with airflow.DAG('step-03-hydrate-tables',
default_args=default_args,
start_date=datetime(2021, 1, 1),
# Not scheduled, trigger only
schedule_interval=None) as dag:
sql_taxi_external_tables = BigQueryInsertJobOperator(
task_id="sql_taxi_external_tables",
location=bigquery_region,
configuration={
"query": {
"query": sql_taxi_external_tables,
"useLegacySql": False,
}
})
sql_taxi_internal_tables = BigQueryInsertJobOperator(
task_id="sql_taxi_internal_tables",
location=bigquery_region,
configuration={
"query": {
"query": sql_taxi_internal_tables,
"useLegacySql": False,
}
})
sql_taxi_biglake_tables = BigQueryInsertJobOperator(
task_id="sql_taxi_biglake_tables",
location=bigquery_region,
configuration={
"query": {
"query": sql_taxi_biglake_tables,
"useLegacySql": False,
}
})
sql_create_product_deliveries = BigQueryInsertJobOperator(
task_id="sql_create_product_deliveries",
location=bigquery_region,
configuration={
"query": {
"query": sql_create_product_deliveries,
"useLegacySql": False,
}
})
sql_create_thelook_tables = BigQueryInsertJobOperator(
task_id="sql_create_thelook_tables",
location=bigquery_region,
configuration={
"query": {
"query": sql_create_thelook_tables,
"useLegacySql": False,
}
})
sql_taxi_external_tables >> sql_taxi_internal_tables >> sql_taxi_biglake_tables >> \
sql_create_product_deliveries >> sql_create_thelook_tables
# [END dag]