def main()

in dataproc/pyspark_apache_hudi.py [0:0]


def main(hudi_bucket):
  table_name = "location_hudi"
  table_uri= f"gs://{hudi_bucket}/biglake-tables/location_hudi"
  app_name = f'pyspark-hudi-test_{table_name}'

  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  file_location = f"gs://{hudi_bucket}/biglake-tables/location_parquet/location.snappy.parquet"
  df = spark.read.parquet(file_location)

  df_with_ts = df.withColumn("ts", current_timestamp())
  df_with_ts_converted = df_with_ts.withColumn("ts", df_with_ts["ts"].cast("string"))

  print('Writing Hudi table ...')
  write_hudi_table(table_name, table_uri, df_with_ts)

  print('Stopping Spark session ...')
  spark.stop()

  print('Hudi table created.  Please run Sync process.')