in dataproc/pyspark_apache_hudi.py [0:0]
def main(hudi_bucket):
table_name = "location_hudi"
table_uri= f"gs://{hudi_bucket}/biglake-tables/location_hudi"
app_name = f'pyspark-hudi-test_{table_name}'
print(f'Creating Spark session {app_name} ...')
spark = SparkSession.builder.appName(app_name).getOrCreate()
spark.sparkContext.setLogLevel('WARN')
file_location = f"gs://{hudi_bucket}/biglake-tables/location_parquet/location.snappy.parquet"
df = spark.read.parquet(file_location)
df_with_ts = df.withColumn("ts", current_timestamp())
df_with_ts_converted = df_with_ts.withColumn("ts", df_with_ts["ts"].cast("string"))
print('Writing Hudi table ...')
write_hudi_table(table_name, table_uri, df_with_ts)
print('Stopping Spark session ...')
spark.stop()
print('Hudi table created. Please run Sync process.')