dataproc/rideshare_iceberg_serverless.py [97:249]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    df_biglake_rideshare_payment_type_json = spark.read.json("gs://{}/rideshare_payment_type/*.json".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_payment_type_json.createOrReplaceTempView("temp_view_rideshare_payment_type")
    spark.sql("select * from temp_view_rideshare_payment_type").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_payment_type_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(payment_type_id, payment_type_description) " + \
            "SELECT cast(payment_type_id as int), cast(payment_type_description as string) " + \
            "FROM temp_view_rideshare_payment_type;")
        

    #####################################################################################
    # Load the Zone Table in the Enriched Zone
    #####################################################################################
    df_biglake_rideshare_zone_csv = spark.read \
        .option("delimiter", "|") \
        .option("header", "true") \
        .csv("gs://{}/rideshare_zone/*.csv".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_zone_csv.createOrReplaceTempView("temp_view_rideshare_zone")
    spark.sql("select * from temp_view_rideshare_zone").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_zone_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(location_id, borough, zone, service_zone) " + \
            "SELECT cast(location_id as int), cast(borough as string), cast(zone as string), cast(service_zone as string) " + \
            "FROM temp_view_rideshare_zone;")


    #####################################################################################
    # Load the Rideshare Trips Table in the Enriched Zone (3 different formats)
    #####################################################################################
    # AVRO data
    df_biglake_rideshare_trip_avro = spark.read.format("avro").load("gs://{}/rideshare_trip/avro/*.avro".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_trip_avro.createOrReplaceTempView("temp_view_rideshare_trip_avro")
    spark.sql("select * from temp_view_rideshare_trip_avro").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_trip_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(rideshare_trip_id, pickup_location_id, pickup_datetime, " + \
                    "dropoff_location_id, dropoff_datetime, ride_distance, " + \
                    "is_airport, payment_type_id, fare_amount, tip_amount, " + \
                    "taxes_amount, total_amount, credit_card_number, credit_card_expire_date, credit_card_cvv_code, partition_date)" + \
            "SELECT cast(rideshare_trip_id as string), cast(pickup_location_id as int), cast(pickup_datetime as timestamp), " + \
                    "cast(dropoff_location_id as int), cast(dropoff_datetime as timestamp), cast(ride_distance as float), " + \
                    "cast(is_airport as boolean), cast(payment_type_id as int), cast(fare_amount as float), cast(tip_amount as float), " + \
                    "cast(taxes_amount as float), cast(total_amount as float), " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CONCAT(CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST( " + \
                            "  CONCAT(CAST(CAST(ROUND(2022 + RAND() * (2025 - 2022)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (12   - 1)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (28   - 1)) AS INT) AS STRING)) " + \
                            "AS DATE) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "cast(partition_date as date) " + \
            "FROM temp_view_rideshare_trip_avro;")

            
    # Parquet data
    df_biglake_rideshare_trip_parquet = spark.read.parquet("gs://{}/rideshare_trip/parquet/*.parquet".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_trip_parquet.createOrReplaceTempView("temp_view_rideshare_trip_parquet")
    spark.sql("select * from temp_view_rideshare_trip_parquet").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_trip_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(rideshare_trip_id, pickup_location_id, pickup_datetime, " + \
                    "dropoff_location_id, dropoff_datetime, ride_distance, " + \
                    "is_airport, payment_type_id, fare_amount, tip_amount, " + \
                    "taxes_amount, total_amount, credit_card_number, credit_card_expire_date, credit_card_cvv_code, partition_date)" + \
            "SELECT cast(rideshare_trip_id as string), cast(pickup_location_id as int), cast(pickup_datetime as timestamp), " + \
                    "cast(dropoff_location_id as int), cast(dropoff_datetime as timestamp), cast(ride_distance as float), " + \
                    "cast(is_airport as boolean), cast(payment_type_id as int), cast(fare_amount as float), cast(tip_amount as float), " + \
                    "cast(taxes_amount as float), cast(total_amount as float), " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CONCAT(CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST( " + \
                            "  CONCAT(CAST(CAST(ROUND(2022 + RAND() * (2025 - 2022)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (12   - 1)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (28   - 1)) AS INT) AS STRING)) " + \
                            "AS DATE) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "cast(partition_date as date) " + \
            "FROM temp_view_rideshare_trip_parquet;")


    # JSON data
    df_biglake_rideshare_trip_json = spark.read.json("gs://{}/rideshare_trip/json/*.json".format(rideshare_raw_bucket))

    # Create Spark View and Show the data
    df_biglake_rideshare_trip_json.createOrReplaceTempView("temp_view_rideshare_trip_json")
    spark.sql("select * from temp_view_rideshare_trip_json").show(10)

    # Insert into Iceberg table (perform typecasting)
    spark.sql("INSERT INTO {}.{}.biglake_rideshare_trip_iceberg ".format(iceberg_catalog, iceberg_warehouse) + \
                    "(rideshare_trip_id, pickup_location_id, pickup_datetime, " + \
                    "dropoff_location_id, dropoff_datetime, ride_distance, " + \
                    "is_airport, payment_type_id, fare_amount, tip_amount, " + \
                    "taxes_amount, total_amount, credit_card_number, credit_card_expire_date, credit_card_cvv_code, partition_date)" + \
            "SELECT cast(rideshare_trip_id as string), cast(pickup_location_id as int), cast(pickup_datetime as timestamp), " + \
                    "cast(dropoff_location_id as int), cast(dropoff_datetime as timestamp), cast(ride_distance as float), " + \
                    "cast(is_airport as boolean), cast(payment_type_id as int), cast(fare_amount as float), cast(tip_amount as float), " + \
                    "cast(taxes_amount as float), cast(total_amount as float), " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CONCAT(CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING),'-', " + \
                            "CAST(CAST(ROUND(1000 + RAND() * (9999 - 1000)) AS INT) AS STRING)) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST( " + \
                            "  CONCAT(CAST(CAST(ROUND(2022 + RAND() * (2025 - 2022)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (12   - 1)) AS INT) AS STRING),'-', " + \
                            "         CAST(CAST(ROUND(   1 + RAND() * (28   - 1)) AS INT) AS STRING)) " + \
                            "AS DATE) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "CASE WHEN cast(payment_type_id as int) = 1 " + \
                        "THEN CAST(CAST(ROUND(100 + RAND() * (999 - 100)) AS INT) AS STRING) " + \
                        "ELSE NULL " + \
                    "END, " + \
                    "cast(partition_date as date) " + \
            "FROM temp_view_rideshare_trip_json;")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



