cloud-composer/dags/sample-iceberg-create-tables-update-data.py (100 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Processes the downloaded Taxi data and saves to Iceberg tables (one green and one yellow table) # To see the tables on storage: # Go to your "gs://processed...." bucket # Click on iceberg-warehouse folder # There is a default folder (our default warehouse) # There are tables under the folder # There are then data directories and metadata directories to view # [START dag] from google.cloud import storage from datetime import datetime, timedelta import sys import os import logging import airflow from airflow.contrib.operators import dataproc_operator from airflow.utils import trigger_rule default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=120), } project_id = os.environ['ENV_PROJECT_ID'] raw_bucket_name = os.environ['ENV_RAW_BUCKET'] processed_bucket_name = os.environ['ENV_PROCESSED_BUCKET'] pyspark_code_create_tables = "gs://" + raw_bucket_name + "/pyspark-code/convert_taxi_to_iceberg_create_tables.py" pyspark_code_update_data = "gs://" + raw_bucket_name + "/pyspark-code/convert_taxi_to_iceberg_data_updates.py" region = os.environ['ENV_DATAPROC_REGION'] yellow_source = "gs://" + raw_bucket_name + "/raw/taxi-data/yellow/*/*.parquet" green_source = "gs://" + raw_bucket_name + "/raw/taxi-data/green/*/*.parquet" dataproc_bucket = os.environ['ENV_DATAPROC_BUCKET'] dataproc_subnet = os.environ['ENV_DATAPROC_SUBNET'] dataproc_service_account = os.environ['ENV_DATAPROC_SERVICE_ACCOUNT'] icebergWarehouse = "gs://" + processed_bucket_name + "/iceberg-warehouse" icebergJARFile = "gs://" + raw_bucket_name + "/pyspark-code/iceberg-spark-runtime-3.1_2.12-0.14.0.jar" # https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig CLUSTER_CONFIG = { "config_bucket" : dataproc_bucket, "temp_bucket": dataproc_bucket, "software_config": { "image_version": "2.0.47-debian10" }, "master_config": { "num_instances": 1, "machine_type_uri": "n1-standard-8", "disk_config": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 30, "num_local_ssds":2}, }, "worker_config": { "num_instances": 2, "machine_type_uri": "n1-standard-16", "disk_config": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 30, "num_local_ssds":2}, }, "gce_cluster_config" :{ "subnetwork_uri" : dataproc_subnet, "service_account" : dataproc_service_account, "internal_ip_only" : True, "service_account_scopes" : ["https://www.googleapis.com/auth/cloud-platform"], "shielded_instance_config" : { "enable_secure_boot" : True, "enable_vtpm": True, "enable_integrity_monitoring": True } } } with airflow.DAG('sample-iceberg-create-tables-update-data', default_args=default_args, start_date=datetime(2021, 1, 1), # Not scheduled, trigger only schedule_interval=None) as dag: # Create cluster create_dataproc_iceberg_cluster = dataproc_operator.DataprocClusterCreateOperator( default_args=default_args, task_id='create-dataproc-iceberg-cluster', project_id=project_id, region=region, cluster_name='process-taxi-data-iceberg-{{ ts_nodash.lower() }}', cluster_config=CLUSTER_CONFIG, ) # Process taxi data into Iceberg table format create_iceberg_tables = dataproc_operator.DataProcPySparkOperator( default_args=default_args, task_id='create-iceberg-tables', project_id=project_id, region=region, cluster_name='process-taxi-data-iceberg-{{ ts_nodash.lower() }}', dataproc_jars=[icebergJARFile], main=pyspark_code_create_tables, arguments=[yellow_source, green_source, icebergWarehouse]) # Perform data updates to the Iceberg data perform_iceberg_data_updates = dataproc_operator.DataProcPySparkOperator( default_args=default_args, task_id='perform-iceberg-data-updates', project_id=project_id, region=region, cluster_name='process-taxi-data-iceberg-{{ ts_nodash.lower() }}', dataproc_jars=[icebergJARFile], main=pyspark_code_update_data, arguments=[icebergWarehouse]) # Delete Cloud Dataproc cluster delete_dataproc_iceberg_cluster = dataproc_operator.DataprocClusterDeleteOperator( default_args=default_args, task_id='delete-dataproc-iceberg-cluster', project_id=project_id, region=region, cluster_name='process-taxi-data-iceberg-{{ ts_nodash.lower() }}', # Setting trigger_rule to ALL_DONE causes the cluster to be deleted even if the Dataproc job fails. trigger_rule=trigger_rule.TriggerRule.ALL_DONE) create_dataproc_iceberg_cluster >> \ create_iceberg_tables >> \ perform_iceberg_data_updates >> \ delete_dataproc_iceberg_cluster # [END dag]