init/jupyter/gravitino-spark-trino-example.ipynb (199 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "df953e1a-b48a-4eda-b099-c89c9449fdad", "metadata": {}, "source": [ "## Spark write Hive table" ] }, { "cell_type": "code", "execution_count": null, "id": "429e5d1b", "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "import os\n", "from pyspark.sql import SparkSession\n", "\n", "spark_home = os.getenv('SPARK_HOME')\n", "os.environ['HADOOP_USER_NAME']=\"anonymous\"\n", "\n", "spark = SparkSession.builder \\\n", " .appName(\"PySpark SQL Example\") \\\n", " .config(\"spark.plugins\", \"org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin\") \\\n", " .config(\"spark.jars\", \"/tmp/gravitino/packages/iceberg-spark-runtime-3.4_2.12-1.5.2.jar,/tmp/gravitino/packages/gravitino-spark-connector-runtime-3.4_2.12-0.8.0-incubating.jar\") \\\n", " .config(\"spark.sql.gravitino.uri\", \"http://gravitino:8090\") \\\n", " .config(\"spark.sql.gravitino.metalake\", \"metalake_demo\") \\\n", " .config(\"spark.sql.gravitino.enableIcebergSupport\", \"true\") \\\n", " .config(\"spark.sql.catalog.catalog_rest\", \"org.apache.iceberg.spark.SparkCatalog\") \\\n", " .config(\"spark.sql.catalog.catalog_rest.type\", \"rest\") \\\n", " .config(\"spark.sql.catalog.catalog_rest.uri\", \"http://gravitino:9001/iceberg/\") \\\n", " .config(\"spark.locality.wait.node\", \"0\") \\\n", " .config(\"spark.sql.warehouse.dir\", \"hdfs://hive:9000/user/hive/warehouse\") \\\n", " .enableHiveSupport() \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": null, "id": "d70e6c04-bb61-4b5b-8525-41a4a5a34b54", "metadata": {}, "outputs": [], "source": [ "spark.sql(\"use catalog_hive\")\n", "spark.sql(\"show databases\").show()" ] }, { "cell_type": "code", "execution_count": null, "id": "4f3e6e17-31d4-40dd-953b-358c87fbf429", "metadata": {}, "outputs": [], "source": [ "spark.sql(\"CREATE DATABASE IF NOT EXISTS product;\")\n", "spark.sql(\"USE product;\")\n", "spark.sql(\"CREATE TABLE IF NOT EXISTS employees (id INT, name STRING, age INT) PARTITIONED BY (department STRING) STORED AS PARQUET;\")\n", "spark.sql(\"DESC TABLE EXTENDED employees;\").show()" ] }, { "cell_type": "code", "execution_count": null, "id": "69ebc70e-1ad0-4ea3-88f1-8e7038160d9b", "metadata": {}, "outputs": [], "source": [ "spark.sql(\"INSERT OVERWRITE TABLE employees PARTITION(department='Engineering') VALUES (1, 'John Doe', 30), (2, 'Jane Smith', 28);\")\n", "spark.sql(\"INSERT OVERWRITE TABLE employees PARTITION(department='Marketing') VALUES (3, 'Mike Brown', 32);\")\n", "spark.sql(\"SELECT * from employees\").show()" ] }, { "cell_type": "markdown", "id": "554c5267-be5c-4dd8-9106-f4379597c16a", "metadata": {}, "source": [ "## Query the table with Trino" ] }, { "cell_type": "code", "execution_count": null, "id": "3b0d643f-1593-4f82-94a7-e6bb40a12be2", "metadata": {}, "outputs": [], "source": [ "%pip install trino" ] }, { "cell_type": "code", "execution_count": null, "id": "33cc8e5a-1e76-42c7-b65b-5672d25162be", "metadata": {}, "outputs": [], "source": [ "from trino.dbapi import connect\n", "\n", "# Create a Trino connector client\n", "conn = connect(\n", " host=\"trino\",\n", " port=8080,\n", " user=\"admin\",\n", " catalog=\"catalog_hive\",\n", " schema=\"http\",\n", ")\n", "\n", "trino_client = conn.cursor()" ] }, { "cell_type": "code", "execution_count": null, "id": "3479ab03-00d5-4115-a43c-cf0a39411183", "metadata": {}, "outputs": [], "source": [ "print(trino_client.execute(\"SELECT * FROM catalog_hive.product.employees WHERE department = 'Engineering'\").fetchall())" ] }, { "cell_type": "markdown", "id": "b33ef14d-a3b7-48a0-a3e7-5c0daf6f7ee5", "metadata": {}, "source": [ "## Spark write data with Iceberg REST service" ] }, { "cell_type": "code", "execution_count": null, "id": "c05c7234-28e9-45e4-867c-2304febef730", "metadata": {}, "outputs": [], "source": [ "spark.sql(\"use catalog_rest;\")\n", "spark.sql(\"create database if not exists sales;\")\n", "spark.sql(\"use sales;\")\n", "spark.sql(\"create table customers (customer_id int, customer_name varchar(100), customer_email varchar(100));\")\n" ] }, { "cell_type": "code", "execution_count": null, "id": "836de98b-5107-4f6c-a0fc-73a1fcf23d6f", "metadata": {}, "outputs": [], "source": [ "spark.sql(\"insert into customers (customer_id, customer_name, customer_email) values (11,'Rory Brown','rory@123.com');\")\n", "spark.sql(\"insert into customers (customer_id, customer_name, customer_email) values (12,'Jerry Washington','jerry@dt.com');\")\n", "spark.sql(\"select * from customers\").show()" ] }, { "cell_type": "markdown", "id": "74b8e45c-e962-4479-947f-478a9e7cc0c6", "metadata": {}, "source": [ "## Trino do federation query data with Hive and Iceberg" ] }, { "cell_type": "code", "execution_count": null, "id": "76057b91-c720-485f-ada7-2ae773fea311", "metadata": {}, "outputs": [], "source": [ "print(trino_client.execute(\"select * from catalog_hive.sales.customers union select * from catalog_iceberg.sales.customers\").fetchall())" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.5" } }, "nbformat": 4, "nbformat_minor": 5 }