## Spark write Hive table

In [None]:
import pyspark
import os
from pyspark.sql import SparkSession

spark_home = os.getenv('SPARK_HOME')
os.environ['HADOOP_USER_NAME']="anonymous"

spark = SparkSession.builder \
    .appName("PySpark SQL Example") \
    .config("spark.plugins", "org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin") \
    .config("spark.jars", "/tmp/gravitino/packages/iceberg-spark-runtime-3.4_2.12-1.5.2.jar,/tmp/gravitino/packages/gravitino-spark-connector-runtime-3.4_2.12-0.8.0-incubating.jar") \
    .config("spark.sql.gravitino.uri", "http://gravitino:8090") \
    .config("spark.sql.gravitino.metalake", "metalake_demo") \
    .config("spark.sql.gravitino.enableIcebergSupport", "true") \
    .config("spark.sql.catalog.catalog_rest", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.catalog_rest.type", "rest") \
    .config("spark.sql.catalog.catalog_rest.uri", "http://gravitino:9001/iceberg/") \
    .config("spark.locality.wait.node", "0") \
    .config("spark.sql.warehouse.dir", "hdfs://hive:9000/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
spark.sql("use catalog_hive")
spark.sql("show databases").show()

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS product;")
spark.sql("USE product;")
spark.sql("CREATE TABLE IF NOT EXISTS employees (id INT, name STRING, age INT) PARTITIONED BY (department STRING) STORED AS PARQUET;")
spark.sql("DESC TABLE EXTENDED employees;").show()

In [None]:
spark.sql("INSERT OVERWRITE TABLE employees PARTITION(department='Engineering') VALUES (1, 'John Doe', 30), (2, 'Jane Smith', 28);")
spark.sql("INSERT OVERWRITE TABLE employees PARTITION(department='Marketing') VALUES (3, 'Mike Brown', 32);")
spark.sql("SELECT * from employees").show()

## Query the table with Trino

In [None]:
%pip install trino

In [None]:
from trino.dbapi import connect

# Create a Trino connector client
conn = connect(
    host="trino",
    port=8080,
    user="admin",
    catalog="catalog_hive",
    schema="http",
)

trino_client = conn.cursor()

In [None]:
print(trino_client.execute("SELECT * FROM catalog_hive.product.employees WHERE department = 'Engineering'").fetchall())

## Spark write data with Iceberg REST service

In [None]:
spark.sql("use catalog_rest;")
spark.sql("create database if not exists sales;")
spark.sql("use sales;")
spark.sql("create table customers (customer_id int, customer_name varchar(100), customer_email varchar(100));")


In [None]:
spark.sql("insert into customers (customer_id, customer_name, customer_email) values (11,'Rory Brown','rory@123.com');")
spark.sql("insert into customers (customer_id, customer_name, customer_email) values (12,'Jerry Washington','jerry@dt.com');")
spark.sql("select * from customers").show()

## Trino do federation query data with Hive and Iceberg

In [None]:
print(trino_client.execute("select * from catalog_hive.sales.customers union select * from catalog_iceberg.sales.customers").fetchall())