cloud-composer/dags/sample-export-taxi-trips-from-bq-to-gcs-cluster.py (82 lines of code) (raw):
####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
# Author: Adam Paternostro
# Summary: Runs a Dataproc Cluster to export the taxi_trips table to GCS
# The spark code (in the dataproc folder: export_taxi_data_from_bq_to_gcs.py) exports the data
# as parquet and is partitioned by year-month-day-hour-minute. This generates alot of files!
# The goal is to place a BigLake table with a feature to show fast performance with lots of small files.
# Many small files on a data lake is a common performance issue, so we want to show to to address this
# with BigQuery.
# NOTE: This can take hours to run!
# This exports data for several years!
# To Run: Edit the export_taxi_data_from_bq_to_gcs.py file and change the following:
# years = [2021, 2020, 2019] => years = [2021]
# for data_month in range(1, 13): => for data_month in range(1, 2):
# The above will export 1 year/month instead of 3 years and 12 months per year
# [START dag]
from google.cloud import storage
from datetime import datetime, timedelta
import sys
import os
import logging
import airflow
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': None,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'dagrun_timeout' : timedelta(minutes=600),
}
project_id = os.environ['ENV_PROJECT_ID']
raw_bucket_name = os.environ['ENV_RAW_BUCKET']
processed_bucket_name = "gs://" + os.environ['ENV_PROCESSED_BUCKET']
pyspark_code = "gs://" + raw_bucket_name + "/pyspark-code/export_taxi_data_from_bq_to_gcs.py"
region = os.environ['ENV_DATAPROC_REGION']
# zone = os.environ['ENV_ZONE']
dataproc_bucket = os.environ['ENV_DATAPROC_BUCKET']
dataproc_subnet = os.environ['ENV_DATAPROC_SUBNET']
dataproc_service_account = os.environ['ENV_DATAPROC_SERVICE_ACCOUNT']
taxi_dataset_id = os.environ['ENV_TAXI_DATASET_ID']
jar_file = "gs://" + raw_bucket_name + "/pyspark-code/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
# https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig
CLUSTER_CONFIG = {
"config_bucket" : dataproc_bucket,
"temp_bucket": dataproc_bucket,
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-8",
"disk_config": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 30, "num_local_ssds":2},
},
"worker_config": {
"num_instances": 3,
"machine_type_uri": "n1-standard-16",
"disk_config": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 30, "num_local_ssds":2},
},
"gce_cluster_config" :{
"subnetwork_uri" : dataproc_subnet,
"service_account" : dataproc_service_account,
"internal_ip_only" : True,
"service_account_scopes" : ["https://www.googleapis.com/auth/cloud-platform"],
"shielded_instance_config" : {
"enable_secure_boot" : True,
"enable_vtpm": True,
"enable_integrity_monitoring": True
}
}
}
# if you want to specify a zone
# "gce_cluster_config" :{
# "zone_uri" : zone,
# "subnetwork_uri" : dataproc_subnet,
# "service_account" : dataproc_service_account,
# "service_account_scopes" : ["https://www.googleapis.com/auth/cloud-platform"]
# }
with airflow.DAG('sample-export-taxi-trips-from-bq-to-gcs-cluster',
default_args=default_args,
start_date=datetime(2021, 1, 1),
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Create cluster
create_dataproc_export_cluster = dataproc_operator.DataprocClusterCreateOperator(
default_args=default_args,
task_id='create-dataproc-export-cluster',
project_id=project_id,
region=region,
cluster_name='process-taxi-trips-export-{{ ts_nodash.lower() }}',
cluster_config=CLUSTER_CONFIG,
)
# Run the Spark code to processes the raw files to a processed folder
# Need to implement this: https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#repeated-mapping
run_dataproc_export_spark = dataproc_operator.DataProcPySparkOperator(
default_args=default_args,
task_id='task-taxi-trips-export',
project_id=project_id,
region=region,
cluster_name='process-taxi-trips-export-{{ ts_nodash.lower() }}',
dataproc_jars=[jar_file],
main=pyspark_code,
arguments=[project_id, taxi_dataset_id, dataproc_bucket, processed_bucket_name])
# Delete Cloud Dataproc cluster
delete_dataproc_export_cluster = dataproc_operator.DataprocClusterDeleteOperator(
default_args=default_args,
task_id='delete-dataproc-export-cluster',
project_id=project_id,
region=region,
cluster_name='process-taxi-trips-export-{{ ts_nodash.lower() }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
create_dataproc_export_cluster >> run_dataproc_export_spark >> delete_dataproc_export_cluster
# [END dag]