dataproc/export_taxi_data_from_bq_to_gcs.py (62 lines of code) (raw):
#!/usr/bin/env python
####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
# Author: Adam Paternostro
# Summary: Read the BigQuery "taxi_dataset.taxi_trips" table and exports to parquet format
# This uses dataproc serverless spark
# The data is exported partitioned by each minute (inefficient on purpose)
# The goal is to generate a lot of small files (antipattern) to demo BQ performance on small files
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, hour, minute
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from datetime import datetime
import time
import sys
def ExportTaxiData(project_id, taxi_dataset_id, temporaryGcsBucket, destination):
spark = SparkSession \
.builder \
.appName("export_taxi_data_from_bq_to_gcs") \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "[bucket]"
spark.conf.set('temporaryGcsBucket', temporaryGcsBucket)
years = [2019]
#years = [2021]
for data_year in years:
print("data_year: ", data_year)
for data_month in range(12, 13):
#for data_month in range(1, 3):
print("data_month: ", data_month)
# Sample Code: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example#pyspark
# To use SQL to BQ
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationProject",project_id)
spark.conf.set("materializationDataset",taxi_dataset_id)
print ("BEGIN: Querying Table")
sql = "SELECT * " + \
"FROM `" + project_id + "." + taxi_dataset_id + ".taxi_trips` " + \
"WHERE EXTRACT(YEAR FROM Pickup_DateTime) = " + str(data_year) + " " + \
"AND EXTRACT(MONTH FROM Pickup_DateTime) = " + str(data_month) + ";"
print ("SQL: ", sql)
df_taxi_trips = spark.read.format("bigquery").option("query", sql).load()
print ("END: Querying Table")
# Returns too much data to process with our limited demo core CPU quota
# Load data from BigQuery taxi_trips table
"""
print ("BEGIN: Querying Table")
df_taxi_trips = spark.read.format('bigquery') \
.option('table', project_id + ':' + taxi_dataset_id + '.taxi_trips') \
.load()
print ("END: Querying Table")
"""
print ("BEGIN: Adding partition columns to dataframe")
df_taxi_trips_partitioned = df_taxi_trips \
.withColumn("year", year (col("Pickup_DateTime"))) \
.withColumn("month", month (col("Pickup_DateTime"))) \
.withColumn("day", dayofmonth (col("Pickup_DateTime"))) \
.withColumn("hour", hour (col("Pickup_DateTime"))) \
.withColumn("minute", minute (col("Pickup_DateTime")))
print ("END: Adding partition columns to dataframe")
# Write as Parquet
print ("BEGIN: Writing Data to GCS")
outputPath = destination + "/processed/taxi-trips-query-acceleration/"
df_taxi_trips_partitioned \
.write \
.mode("append") \
.partitionBy("year","month","day","hour","minute") \
.parquet(outputPath)
print ("END: Writing Data to GCS")
spark.stop()
# Main entry point
if __name__ == "__main__":
if len(sys.argv) != 5:
print("Usage: export_taxi_data_from_bq_to_gcs project_id taxi_dataset_id temporaryGcsBucket destination")
sys.exit(-1)
project_id = sys.argv[1]
taxi_dataset_id = sys.argv[2]
temporaryGcsBucket = sys.argv[3]
destination = sys.argv[4]
print ("project_id: ", project_id)
print ("taxi_dataset_id: ", taxi_dataset_id)
print ("temporaryGcsBucket: ", temporaryGcsBucket)
print ("destination: ", destination)
print ("BEGIN: Main")
ExportTaxiData(project_id, taxi_dataset_id, temporaryGcsBucket, destination)
print ("END: Main")
# Sample run using static Dataproc Cluster
"""
project_string="s3epuwhxbf"
project="data-analytics-demo-${project_string}"
dataproceTempBucketName="dataproc-query-acceleration-temp"
serviceAccount="dataproc-service-account@${project}.iam.gserviceaccount.com"
rawBucket="raw-${project}"
processedBucket="processed-${project}"
# Create cluster (in central region)
# NOTE: You have to createa subnet in central called "dataproc-subnet-central"
# You have to also create a firewall rule (like the existing one for this subnet)
gcloud dataproc clusters create dataproc-cluster \
--bucket "${dataproceTempBucketName}" \
--region REPLACE-REGION \
--subnet dataproc-subnet-central \
--zone REPLACE-REGION-a \
--master-machine-type n1-standard-8 \
--master-boot-disk-size 500 \
--num-workers 3 \
--service-account="${serviceAccount}" \
--worker-machine-type n1-standard-16 \
--worker-boot-disk-size 500 \
--image-version 2.0-debian10 \
--num-worker-local-ssds=4 \
--project "${project}"
# Copy script to storage
gsutil cp ./dataproc/export_taxi_data_from_bq_to_gcs.py gs://${rawBucket}/pyspark-code
# Write to bucket (regional)
gcloud dataproc jobs submit pyspark \
--cluster "dataproc-cluster" \
--region="REPLACE-REGION" \
--project="${project}" \
--jars gs://${rawBucket}/pyspark-code/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
gs://${rawBucket}/pyspark-code/export_taxi_data_from_bq_to_gcs.py \
-- ${project} taxi_dataset ${dataproceTempBucketName} "gs://dataproc-query-acceleration/taxi-export"
# Write to local HDFS "/tmp/taxi-export" (you have to SSH to the machine and then distcp the files to a bucket)
# To SSH you need a firewall rule to open traffic
# This is FAST! But copying the data after the job is hard since we need this automated.
gcloud dataproc jobs submit pyspark \
--cluster "dataproc-cluster" \
--region="REPLACE-REGION" \
--project="${project}" \
--jars gs://${rawBucket}/pyspark-code/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
gs://${rawBucket}/pyspark-code/export_taxi_data_from_bq_to_gcs.py \
-- ${project} taxi_dataset ${dataproceTempBucketName} /tmp/taxi-export2
##################################################################
# SSH
##################################################################
myIPAddress=$(curl --silent ifconfig.me/ip)
gcloud compute firewall-rules create home-computer \
--project="${project}" \
--direction=INGRESS \
--priority=1000 \
--network=vpc-main \
--action=ALLOW \
--rules=tcp:22 \
--source-ranges=${myIPAddress} \
--target-service-accounts="${serviceAccount}"
# Create firewall rule for:
# Type: Ingress
# Target: Service Account: dataproc-service-account@data-analytics-demo-${project_string}.iam.gserviceaccount.com
# Port: TCP port 22 Ingress
# Source IP address of your home network
gcloud compute ssh --zone "REPLACE-REGION-a" "dataproc-cluster-m" --project "data-analytics-demo-${project_string}"
# Run via SSH
hdfs dfs -ls /tmp/taxi-export/processed
hdfs dfs -count /tmp/taxi-export/processed
# grant access to dataproc-service-account@${project}.iam.gserviceaccount.com to your storage account
# 2022-09-17 00:43:15,803 INFO tools.SimpleCopyListing: Paths (files+dirs) cnt = 5069012; dirCnt = 1588356
# https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/administration/content/distcp_faq.html
export HADOOP_CLIENT_OPTS="-Xms64m -Xmx1024m"
hdfs dfs -ls /tmp/taxi-export/processed/taxi-trips-query-acceleration
hadoop distcp /tmp/taxi-export/processed/taxi-trips-query-acceleration gs://dataproc-data-analytics-demo-${project_string}/taxi-data/
FINAL:
2022-09-20 19:51:43,608 INFO tools.SimpleCopyListing: Paths (files+dirs) cnt = 5068912; dirCnt = 1588355
hadoop distcp /tmp/taxi-export/processed/taxi-trips-query-acceleration gs://data-query-acceleration/
# slower than distcp
hdfs dfs -cp -f /tmp/taxi-export/processed/taxi-trips-query-acceleration gs://processed-data-analytics-demo-${project_string}/copytest/
# Delete the cluster
gcloud dataproc clusters delete dataproc-cluster --region REPLACE-REGION --project="${project}"
"""
# Sample run via command line using Dataproc Serverless
# You must delete a lot of data from the taxi_trips table in order to test this.
# The amount of files can overwhelm most Spark clusters
"""
project_string="s3epuwhxbf"
gsutil cp ./dataproc/export_taxi_data_from_bq_to_gcs.py gs://raw-data-analytics-demo-${project_string}/pyspark-code
gcloud beta dataproc batches submit pyspark \
--project="data-analytics-demo-${project_string}" \
--region="REPLACE-REGION" \
--batch="batch-015" \
gs://raw-data-analytics-demo-${project_string}/pyspark-code/export_taxi_data_from_bq_to_gcs.py \
--jars gs://raw-data-analytics-demo-${project_string}/pyspark-code/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--subnet="datatproc-serverless-subnet" \
--deps-bucket="gs://dataproc-data-analytics-demo-${project_string}" \
--service-account="dataproc-service-account@data-analytics-demo-${project_string}.iam.gserviceaccount.com" \
-- data-analytics-demo-${project_string} taxi_dataset bigspark-data-analytics-demo-${project_string} gs://processed-data-analytics-demo-${project_string}
# to cancel
gcloud dataproc batches cancel batch-000 --project data-analytics-demo-${project_string} --region REPLACE-REGION
"""