init/jupyter/gravitino-spark-trino-example.ipynb (199 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "df953e1a-b48a-4eda-b099-c89c9449fdad",
"metadata": {},
"source": [
"## Spark write Hive table"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "429e5d1b",
"metadata": {},
"outputs": [],
"source": [
"import pyspark\n",
"import os\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark_home = os.getenv('SPARK_HOME')\n",
"os.environ['HADOOP_USER_NAME']=\"anonymous\"\n",
"\n",
"spark = SparkSession.builder \\\n",
" .appName(\"PySpark SQL Example\") \\\n",
" .config(\"spark.plugins\", \"org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin\") \\\n",
" .config(\"spark.jars\", \"/tmp/gravitino/packages/iceberg-spark-runtime-3.4_2.12-1.5.2.jar,/tmp/gravitino/packages/gravitino-spark-connector-runtime-3.4_2.12-0.8.0-incubating.jar\") \\\n",
" .config(\"spark.sql.gravitino.uri\", \"http://gravitino:8090\") \\\n",
" .config(\"spark.sql.gravitino.metalake\", \"metalake_demo\") \\\n",
" .config(\"spark.sql.gravitino.enableIcebergSupport\", \"true\") \\\n",
" .config(\"spark.sql.catalog.catalog_rest\", \"org.apache.iceberg.spark.SparkCatalog\") \\\n",
" .config(\"spark.sql.catalog.catalog_rest.type\", \"rest\") \\\n",
" .config(\"spark.sql.catalog.catalog_rest.uri\", \"http://gravitino:9001/iceberg/\") \\\n",
" .config(\"spark.locality.wait.node\", \"0\") \\\n",
" .config(\"spark.sql.warehouse.dir\", \"hdfs://hive:9000/user/hive/warehouse\") \\\n",
" .enableHiveSupport() \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d70e6c04-bb61-4b5b-8525-41a4a5a34b54",
"metadata": {},
"outputs": [],
"source": [
"spark.sql(\"use catalog_hive\")\n",
"spark.sql(\"show databases\").show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4f3e6e17-31d4-40dd-953b-358c87fbf429",
"metadata": {},
"outputs": [],
"source": [
"spark.sql(\"CREATE DATABASE IF NOT EXISTS product;\")\n",
"spark.sql(\"USE product;\")\n",
"spark.sql(\"CREATE TABLE IF NOT EXISTS employees (id INT, name STRING, age INT) PARTITIONED BY (department STRING) STORED AS PARQUET;\")\n",
"spark.sql(\"DESC TABLE EXTENDED employees;\").show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "69ebc70e-1ad0-4ea3-88f1-8e7038160d9b",
"metadata": {},
"outputs": [],
"source": [
"spark.sql(\"INSERT OVERWRITE TABLE employees PARTITION(department='Engineering') VALUES (1, 'John Doe', 30), (2, 'Jane Smith', 28);\")\n",
"spark.sql(\"INSERT OVERWRITE TABLE employees PARTITION(department='Marketing') VALUES (3, 'Mike Brown', 32);\")\n",
"spark.sql(\"SELECT * from employees\").show()"
]
},
{
"cell_type": "markdown",
"id": "554c5267-be5c-4dd8-9106-f4379597c16a",
"metadata": {},
"source": [
"## Query the table with Trino"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3b0d643f-1593-4f82-94a7-e6bb40a12be2",
"metadata": {},
"outputs": [],
"source": [
"%pip install trino"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "33cc8e5a-1e76-42c7-b65b-5672d25162be",
"metadata": {},
"outputs": [],
"source": [
"from trino.dbapi import connect\n",
"\n",
"# Create a Trino connector client\n",
"conn = connect(\n",
" host=\"trino\",\n",
" port=8080,\n",
" user=\"admin\",\n",
" catalog=\"catalog_hive\",\n",
" schema=\"http\",\n",
")\n",
"\n",
"trino_client = conn.cursor()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3479ab03-00d5-4115-a43c-cf0a39411183",
"metadata": {},
"outputs": [],
"source": [
"print(trino_client.execute(\"SELECT * FROM catalog_hive.product.employees WHERE department = 'Engineering'\").fetchall())"
]
},
{
"cell_type": "markdown",
"id": "b33ef14d-a3b7-48a0-a3e7-5c0daf6f7ee5",
"metadata": {},
"source": [
"## Spark write data with Iceberg REST service"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c05c7234-28e9-45e4-867c-2304febef730",
"metadata": {},
"outputs": [],
"source": [
"spark.sql(\"use catalog_rest;\")\n",
"spark.sql(\"create database if not exists sales;\")\n",
"spark.sql(\"use sales;\")\n",
"spark.sql(\"create table customers (customer_id int, customer_name varchar(100), customer_email varchar(100));\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "836de98b-5107-4f6c-a0fc-73a1fcf23d6f",
"metadata": {},
"outputs": [],
"source": [
"spark.sql(\"insert into customers (customer_id, customer_name, customer_email) values (11,'Rory Brown','rory@123.com');\")\n",
"spark.sql(\"insert into customers (customer_id, customer_name, customer_email) values (12,'Jerry Washington','jerry@dt.com');\")\n",
"spark.sql(\"select * from customers\").show()"
]
},
{
"cell_type": "markdown",
"id": "74b8e45c-e962-4479-947f-478a9e7cc0c6",
"metadata": {},
"source": [
"## Trino do federation query data with Hive and Iceberg"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "76057b91-c720-485f-ada7-2ae773fea311",
"metadata": {},
"outputs": [],
"source": [
"print(trino_client.execute(\"select * from catalog_hive.sales.customers union select * from catalog_iceberg.sales.customers\").fetchall())"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}