def CreateIcebergWarehouse()

in dataproc/rideshare_iceberg_serverless.py [0:0]


def CreateIcebergWarehouse(project_id,iceberg_catalog,iceberg_warehouse,bq_rideshare_enriched_dataset,bq_rideshare_raw_dataset,rideshare_raw_bucket,rideshare_enriched_bucket,bigquery_region):
    print("BEGIN: CreateIcebergWarehouse")

    # We need the ".config" options set for the default Iceberg catalog
    spark = SparkSession \
        .builder \
        .appName("BigLake Iceberg") \
        .config("spark.network.timeout", 50000) \
        .getOrCreate()

    # .enableHiveSupport() \

    #project_id = "data-analytics-demo-rexm45tpvr"
    #iceberg_catalog = "iceberg_catalog_1"
    #iceberg_warehouse = "iceberg_warehouse"
    #bq_rideshare_enriched_dataset = "iceberg_test"
    #bq_rideshare_raw_dataset = "rideshare_lakehouse_raw"
    #rideshare_raw_bucket = "rideshare-lakehouse-raw-rexm45tpvr"

    #####################################################################################
    # Iceberg Initialization
    #####################################################################################
    spark.sql("CREATE NAMESPACE IF NOT EXISTS {};".format(iceberg_catalog))
    spark.sql("CREATE NAMESPACE IF NOT EXISTS {}.{};".format(iceberg_catalog,iceberg_warehouse))
    spark.sql("DROP TABLE IF EXISTS {}.{}.biglake_rideshare_payment_type_iceberg".format(iceberg_catalog,iceberg_warehouse))


    #####################################################################################
    # Create the Iceberg tables
    #####################################################################################
    spark.sql("CREATE TABLE IF NOT EXISTS {}.{}.biglake_rideshare_payment_type_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(payment_type_id int, payment_type_description string) " + \
            "USING iceberg " + \
            "TBLPROPERTIES(bq_table='{}.biglake_rideshare_payment_type_iceberg', bq_connection='{}.biglake-connection');".format(bq_rideshare_enriched_dataset,bigquery_region))

    spark.sql("CREATE TABLE IF NOT EXISTS {}.{}.biglake_rideshare_zone_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(location_id int, borough string, zone string, service_zone string) " + \
            "USING iceberg " + \
            "TBLPROPERTIES(bq_table='{}.biglake_rideshare_zone_iceberg', bq_connection='{}.biglake-connection');".format(bq_rideshare_enriched_dataset,bigquery_region))

    spark.sql("CREATE TABLE IF NOT EXISTS {}.{}.biglake_rideshare_trip_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(rideshare_trip_id string, pickup_location_id int, pickup_datetime timestamp, " + \
                    "dropoff_location_id int, dropoff_datetime timestamp, ride_distance float, " + \
                    "is_airport boolean, payment_type_id int, fare_amount float, tip_amount float, " + \
                    "taxes_amount float, total_amount float, " + \
                    "credit_card_number string, credit_card_expire_date date, credit_card_cvv_code string, " + \
                    "partition_date date)" + \
            "USING iceberg " + \
            "PARTITIONED BY (partition_date) " + \
            "TBLPROPERTIES(bq_table='{}.biglake_rideshare_trip_iceberg', bq_connection='{}.biglake-connection');".format(bq_rideshare_enriched_dataset,bigquery_region))
        
    #####################################################################################
    # Load the Payment Type Table in the Enriched Zone
    #####################################################################################
    # Option 1: Use BigQuery Spark Adaptor to load the whole table
    # Read from the RAW zone
    #df_biglake_rideshare_payment_type_json = spark.read.format('bigquery') \
    #    .option("table", "{}:{}.biglake_rideshare_payment_type_json".format(project_id,bq_rideshare_raw_dataset)) \
    #    .load()

    # Option 2: Use BigQuery Spark Adaptor to run a SQL statement
    #df_biglake_rideshare_payment_type_json = spark.read.format("bigquery") \
    #  .load("select * from `{}.{}.biglake_rideshare_payment_type_json`"".format(project_id,bq_rideshare_raw_dataset))

    # Option 3: Read from raw files
    df_biglake_rideshare_payment_type_json = spark.read.json("gs://{}/rideshare_payment_type/*.json".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_payment_type_json.createOrReplaceTempView("temp_view_rideshare_payment_type")
    spark.sql("select * from temp_view_rideshare_payment_type").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_payment_type_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(payment_type_id, payment_type_description) " + \
            "SELECT cast(payment_type_id as int), cast(payment_type_description as string) " + \
            "FROM temp_view_rideshare_payment_type;")
        

    #####################################################################################
    # Load the Zone Table in the Enriched Zone
    #####################################################################################
    df_biglake_rideshare_zone_csv = spark.read \
        .option("delimiter", "|") \
        .option("header", "true") \
        .csv("gs://{}/rideshare_zone/*.csv".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_zone_csv.createOrReplaceTempView("temp_view_rideshare_zone")
    spark.sql("select * from temp_view_rideshare_zone").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_zone_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(location_id, borough, zone, service_zone) " + \
            "SELECT cast(location_id as int), cast(borough as string), cast(zone as string), cast(service_zone as string) " + \
            "FROM temp_view_rideshare_zone;")


    #####################################################################################
    # Load the Rideshare Trips Table in the Enriched Zone (3 different formats)
    #####################################################################################
    # AVRO data
    df_biglake_rideshare_trip_avro = spark.read.format("avro").load("gs://{}/rideshare_trip/avro/*.avro".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_trip_avro.createOrReplaceTempView("temp_view_rideshare_trip_avro")
    spark.sql("select * from temp_view_rideshare_trip_avro").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_trip_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(rideshare_trip_id, pickup_location_id, pickup_datetime, " + \
                    "dropoff_location_id, dropoff_datetime, ride_distance, " + \
                    "is_airport, payment_type_id, fare_amount, tip_amount, " + \
                    "taxes_amount, total_amount, credit_card_number, credit_card_expire_date, credit_card_cvv_code, partition_date)" + \
            "SELECT cast(rideshare_trip_id as string), cast(pickup_location_id as int), cast(pickup_datetime as timestamp), " + \
                    "cast(dropoff_location_id as int), cast(dropoff_datetime as timestamp), cast(ride_distance as float), " + \
                    "cast(is_airport as boolean), cast(payment_type_id as int), cast(fare_amount as float), cast(tip_amount as float), " + \
                    "cast(taxes_amount as float), cast(total_amount as float), " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CONCAT(CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST( " + \
                            "  CONCAT(CAST(CAST(ROUND(2022 + RAND() * (2025 - 2022)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (12   - 1)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (28   - 1)) AS INT) AS STRING)) " + \
                            "AS DATE) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "cast(partition_date as date) " + \
            "FROM temp_view_rideshare_trip_avro;")

            
    # Parquet data
    df_biglake_rideshare_trip_parquet = spark.read.parquet("gs://{}/rideshare_trip/parquet/*.parquet".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_trip_parquet.createOrReplaceTempView("temp_view_rideshare_trip_parquet")
    spark.sql("select * from temp_view_rideshare_trip_parquet").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_trip_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(rideshare_trip_id, pickup_location_id, pickup_datetime, " + \
                    "dropoff_location_id, dropoff_datetime, ride_distance, " + \
                    "is_airport, payment_type_id, fare_amount, tip_amount, " + \
                    "taxes_amount, total_amount, credit_card_number, credit_card_expire_date, credit_card_cvv_code, partition_date)" + \
            "SELECT cast(rideshare_trip_id as string), cast(pickup_location_id as int), cast(pickup_datetime as timestamp), " + \
                    "cast(dropoff_location_id as int), cast(dropoff_datetime as timestamp), cast(ride_distance as float), " + \
                    "cast(is_airport as boolean), cast(payment_type_id as int), cast(fare_amount as float), cast(tip_amount as float), " + \
                    "cast(taxes_amount as float), cast(total_amount as float), " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CONCAT(CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST( " + \
                            "  CONCAT(CAST(CAST(ROUND(2022 + RAND() * (2025 - 2022)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (12   - 1)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (28   - 1)) AS INT) AS STRING)) " + \
                            "AS DATE) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "cast(partition_date as date) " + \
            "FROM temp_view_rideshare_trip_parquet;")


    # JSON data
    df_biglake_rideshare_trip_json = spark.read.json("gs://{}/rideshare_trip/json/*.json".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_trip_json.createOrReplaceTempView("temp_view_rideshare_trip_json")
    spark.sql("select * from temp_view_rideshare_trip_json").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_trip_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(rideshare_trip_id, pickup_location_id, pickup_datetime, " + \
                    "dropoff_location_id, dropoff_datetime, ride_distance, " + \
                    "is_airport, payment_type_id, fare_amount, tip_amount, " + \
                    "taxes_amount, total_amount, credit_card_number, credit_card_expire_date, credit_card_cvv_code, partition_date)" + \
            "SELECT cast(rideshare_trip_id as string), cast(pickup_location_id as int), cast(pickup_datetime as timestamp), " + \
                    "cast(dropoff_location_id as int), cast(dropoff_datetime as timestamp), cast(ride_distance as float), " + \
                    "cast(is_airport as boolean), cast(payment_type_id as int), cast(fare_amount as float), cast(tip_amount as float), " + \
                    "cast(taxes_amount as float), cast(total_amount as float), " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CONCAT(CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST( " + \
                            "  CONCAT(CAST(CAST(ROUND(2022 + RAND() * (2025 - 2022)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (12   - 1)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (28   - 1)) AS INT) AS STRING)) " + \
                            "AS DATE) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "cast(partition_date as date) " + \
            "FROM temp_view_rideshare_trip_json;")


    spark.stop()