cloud-composer/dags/sample-rideshare-iceberg-serverless.py (74 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Runs a Dataproc Serverless to export the taxi_trips table to GCS # The spark code (in the dataproc folder: export_taxi_data_from_bq_to_gcs.py) exports the data # as parquet and is partitioned by year-month-day-hour-minute. This generates alot of files! # The goal is to place a BigLake table with a feature to show fast performance with lots of small files. # Many small files on a data lake is a common performance issue, so we want to show to to address this # with BigQuery. # NOTE: This can take hours to run! # This exports data for several years! # To Run: Edit the export_taxi_data_from_bq_to_gcs.py file and change the following: # years = [2021, 2020, 2019] => years = [2021] # for data_month in range(1, 13): => for data_month in range(1, 2): # The above will export 1 year/month instead of 3 years and 12 months per year # [START dag] import os import logging import airflow from datetime import datetime, timedelta from airflow import models from airflow.providers.google.cloud.operators.dataproc import (DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator) default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=600), } project_id = os.environ['ENV_PROJECT_ID'] region = os.environ['ENV_DATAPROC_SERVERLESS_REGION'] raw_bucket_name = os.environ['ENV_RAW_BUCKET'] processed_bucket_name = "gs://" + os.environ['ENV_PROCESSED_BUCKET'] pyspark_code = "gs://" + raw_bucket_name + "/pyspark-code/rideshare_iceberg_serverless.py" jar_file = "gs://spark-lib/biglake/biglake-catalog-iceberg1.5.1-0.1.1-with-dependencies.jar" # hardcoded the subnet name dataproc_subnet = os.environ['ENV_DATAPROC_SERVERLESS_SUBNET_NAME'] dataproc_service_account = os.environ['ENV_DATAPROC_SERVICE_ACCOUNT'] dataproc_bucket = os.environ['ENV_DATAPROC_BUCKET'] taxi_dataset_id = os.environ['ENV_TAXI_DATASET_ID'] iceberg_catalog = "rideshare_iceberg_catalog" iceberg_warehouse = "rideshare_lakehouse_enriched" bq_rideshare_enriched_dataset = os.environ['ENV_RIDESHARE_LAKEHOUSE_ENRICHED_DATASET'] bq_rideshare_raw_dataset = os.environ['ENV_RIDESHARE_LAKEHOUSE_RAW_DATASET'] rideshare_raw_bucket = os.environ['ENV_RIDESHARE_LAKEHOUSE_RAW_BUCKET'] rideshare_enriched_bucket = os.environ['ENV_RIDESHARE_LAKEHOUSE_ENRICHED_BUCKET'] bigquery_region = os.environ['ENV_BIGQUERY_REGION'] # https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.PySparkBatch # https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1#batch BATCH_CONFIG = { 'pyspark_batch': { 'main_python_file_uri': pyspark_code, 'jar_file_uris': [ jar_file ], 'args': [project_id,iceberg_catalog,iceberg_warehouse,bq_rideshare_enriched_dataset,bq_rideshare_raw_dataset,rideshare_raw_bucket,rideshare_enriched_bucket,bigquery_region] }, 'environment_config': {'execution_config': { 'subnetwork_uri': dataproc_subnet, 'service_account' : dataproc_service_account } }, 'runtime_config': { 'version' : '2.2', 'properties': { "spark.sql.catalog.{}.blms_catalog".format(iceberg_catalog) : iceberg_catalog, "spark.sql.catalog.{}.gcp_project".format(iceberg_catalog) : project_id, "spark.jars.packages" : "org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.4.3,org.apache.spark:spark-avro_2.13:3.5.1", "spark.sql.catalog.{}.catalog-impl".format(iceberg_catalog) : "org.apache.iceberg.gcp.biglake.BigLakeCatalog", "spark.sql.catalog.{}.gcp_location".format(iceberg_catalog) : bigquery_region, \ "spark.sql.catalog.{}".format(iceberg_catalog) : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.{}.warehouse".format(iceberg_catalog) : "gs://{}/{}".format(rideshare_enriched_bucket,iceberg_catalog) } } } with airflow.DAG('sample-rideshare-iceberg-serverless', default_args=default_args, start_date=datetime(2021, 1, 1), # Not scheduled, trigger only schedule_interval=None) as dag: # Create serverless batch # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataproc.html#create-a-batch rideshare_iceberg_serverless = DataprocCreateBatchOperator( task_id="rideshare_iceberg_serverless", project_id=project_id, region=region, batch_id="rideshare-iceberg-serverless-{{ ds_nodash }}-{{ ts_nodash.lower() }}", batch=BATCH_CONFIG ) # {{run_id}} = "Batch ID 'taxi-trips-export-manual__2022-09-13T21:46:52.639478+00:00' must conform to pattern '[a-z0-9][a-z0-9\-]{2,61}[a-z0-9]'" rideshare_iceberg_serverless # [END dag]