in dataproc/convert_taxi_to_iceberg_data_updates.py [0:0]
def UpdateIcebergTaxiData(icebergWarehouse):
print("UpdateIcebergTaxiData: icebergWarehouse: ",icebergWarehouse)
# ICEBERG SPECIFIC!
# We need the ".config" options set for the default Iceberg catalog
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", icebergWarehouse) \
.config("spark.network.timeout", 50000) \
.appName("IcebergDataUpdates") \
.getOrCreate()
##############################################################################################################
# Do some data updates
# https://iceberg.apache.org/docs/latest/spark-writes/#delete-from
##############################################################################################################
# Delete some data
query = "DELETE FROM local.default.green_taxi_trips WHERE Vendor_Id != 1"
spark.sql(query)
# Do an update on some data
query = "UPDATE local.default.yellow_taxi_trips SET Surcharge = 100 WHERE Passenger_Count > 6"
spark.sql(query)
# Add a column "iceberg_data" to the Green table
query = "ALTER TABLE local.default.green_taxi_trips ADD COLUMNS (iceberg_data string comment 'Iceberg new column')"
spark.sql(query)
# Update the "iceberg_data"" column with data 'Iceberg was here!'
query = "UPDATE local.default.green_taxi_trips SET iceberg_data = 'Iceberg was here!'"
spark.sql(query)
spark.stop()