def UpdateIcebergTaxiData()

in dataproc/convert_taxi_to_iceberg_data_updates.py [0:0]


def UpdateIcebergTaxiData(icebergWarehouse):
    print("UpdateIcebergTaxiData: icebergWarehouse:  ",icebergWarehouse)

    # ICEBERG SPECIFIC!
    # We need the ".config" options set for the default Iceberg catalog
    spark = SparkSession \
        .builder \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("spark.sql.catalog.spark_catalog.type", "hive") \
        .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.local.type", "hadoop") \
        .config("spark.sql.catalog.local.warehouse", icebergWarehouse) \
        .config("spark.network.timeout", 50000) \
        .appName("IcebergDataUpdates") \
        .getOrCreate()

    ##############################################################################################################
    # Do some data updates
    # https://iceberg.apache.org/docs/latest/spark-writes/#delete-from
    ##############################################################################################################
    
    # Delete some data
    query = "DELETE FROM local.default.green_taxi_trips WHERE Vendor_Id != 1"
    spark.sql(query)

    # Do an update on some data
    query = "UPDATE local.default.yellow_taxi_trips SET Surcharge = 100 WHERE Passenger_Count > 6"
    spark.sql(query)

    # Add a column "iceberg_data" to the Green table
    query = "ALTER TABLE local.default.green_taxi_trips ADD COLUMNS (iceberg_data string comment 'Iceberg new column')"
    spark.sql(query)

     # Update the "iceberg_data"" column with data 'Iceberg was here!'
    query = "UPDATE local.default.green_taxi_trips SET iceberg_data = 'Iceberg was here!'"
    spark.sql(query)

    spark.stop()