dataproc/pyspark_apache_hudi.py (36 lines of code) (raw):
####################################################################################
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
"""
# https://cloud.google.com/dataproc/docs/concepts/components/hudi
# Create Cluster
# The dataproc-service-account service account needs access to BigQuery (made a project editor for now)
project_id="data-analytics-preview"
hudi_bucket="biglake-{project_id}"
# You must create this
dataproc_bucket="${project_id}-4prl0sgfm7"
service_account="dataproc-service-account@${project_id}.iam.gserviceaccount.com"
dataproc_subnet="dataproc-subnet"
gcloud dataproc clusters create "hudi_cluster" \
--project="${project_id}" \
--region="us-central1" \
--num-masters=1 \
--bucket=${dataproc_bucket} \
--temp-bucket=${dataproc_bucket} \
--master-machine-type="n1-standard-8" \
--worker-machine-type="n1-standard-8" \
--num-workers=2 \
--subnet="${dataproc_subnet}" \
--service-account="${service_account}" \
--optional-components=HUDI \
${hudi_bucket}
# Run the job to create the Hudi table
gcloud dataproc jobs submit pyspark \
--cluster="hudi_cluster" \
--region="us-central1" \
hudi.py
-- SSH to cluster
spark-submit \
--master yarn \
--packages com.google.cloud:google-cloud-bigquery:2.10.4 \
--conf spark.driver.userClassPathFirst=true \
--conf spark.executor.userClassPathFirst=true \
--class org.apache.hudi.gcp.bigquery.BigQuerySyncTool \
/usr/lib/hudi/tools/bq-sync-tool/hudi-gcp-bundle-0.12.3.jar \
--project-id ${project_id} \
--dataset-name biglake_dataset \
--dataset-location us \
--table location_hudi \
--source-uri gs://${hudi_bucket}/biglake-tables/location_hudi/borough=* \
--source-uri-prefix gs://${hudi_bucket}/biglake-tables/location_hudi/ \
--base-path gs://${hudi_bucket}/biglake-tables/location_hudi \
--partitioned-by borough \
--use-bq-manifest-file
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
def write_hudi_table(table_name, table_uri, df):
"""Writes Hudi table."""
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'location_id',
'hoodie.datasource.write.partitionpath.field': 'borough',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'insert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
# BQ Support
'hoodie-conf hoodie.partition.metafile.use.base.format' : 'true',
'hoodie-conf hoodie.metadata.enable' : 'true',
'hoodie.datasource.write.hive_style_partitioning' :'true',
'hoodie.datasource.write.drop.partition.columns': 'true',
}
df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)
def main(hudi_bucket):
table_name = "location_hudi"
table_uri= f"gs://{hudi_bucket}/biglake-tables/location_hudi"
app_name = f'pyspark-hudi-test_{table_name}'
print(f'Creating Spark session {app_name} ...')
spark = SparkSession.builder.appName(app_name).getOrCreate()
spark.sparkContext.setLogLevel('WARN')
file_location = f"gs://{hudi_bucket}/biglake-tables/location_parquet/location.snappy.parquet"
df = spark.read.parquet(file_location)
df_with_ts = df.withColumn("ts", current_timestamp())
df_with_ts_converted = df_with_ts.withColumn("ts", df_with_ts["ts"].cast("string"))
print('Writing Hudi table ...')
write_hudi_table(table_name, table_uri, df_with_ts)
print('Stopping Spark session ...')
spark.stop()
print('Hudi table created. Please run Sync process.')
main(sys.argv[1])