# Bootstrap the client with ROOT credentials
Using the python client generated from our OpenAPI spec, we generate a token from our root user's credentials

In [1]:
from polaris.catalog.api.iceberg_catalog_api import IcebergCatalogAPI
from polaris.catalog.api.iceberg_o_auth2_api import IcebergOAuth2API
from polaris.catalog.api_client import ApiClient as CatalogApiClient
from polaris.catalog.api_client import Configuration as CatalogApiClientConfiguration

polaris_credential = 'root:s3cr3t' # pragma: allowlist secret

client_id, client_secret = polaris_credential.split(":")
client = CatalogApiClient(CatalogApiClientConfiguration(username=client_id,
                                 password=client_secret,
                                 host='http://polaris:8181/api/catalog'))

oauth_api = IcebergOAuth2API(client)
token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL',
                            client_id=client_id,
                          client_secret=client_secret,
                          grant_type='client_credentials',
                          _headers={'realm': 'default-realm'})


# Create our first catalog

* Creates a catalog named `polaris_catalog` that writes to a specified location in the Local Filesystem.

In [2]:
from polaris.management import *

client = ApiClient(Configuration(access_token=token.access_token,
                                   host='http://polaris:8181/api/management/v1'))
root_client = PolarisDefaultApi(client)

storage_conf = FileStorageConfigInfo(storage_type="FILE", allowed_locations=["file:///tmp"])
catalog_name = 'polaris_demo'
catalog = Catalog(name=catalog_name, type='INTERNAL', properties={"default-base-location": "file:///tmp/polaris/"},
                storage_config_info=storage_conf)
catalog.storage_config_info = storage_conf
root_client.create_catalog(create_catalog_request=CreateCatalogRequest(catalog=catalog))
resp = root_client.get_catalog(catalog_name=catalog.name)
resp

PolarisCatalog(type='INTERNAL', name='polaris_demo', properties=CatalogProperties(default_base_location='file:///tmp/polaris/', additional_properties={}), create_timestamp=1745882018864, last_update_timestamp=1745882018864, entity_version=1, storage_config_info=FileStorageConfigInfo(storage_type='FILE', allowed_locations=['file:///tmp', 'file:///tmp/polaris/']))

# Utility Functions

In [3]:
# Creates a principal with the given name
def create_principal(api, principal_name):
  principal = Principal(name=principal_name, type="SERVICE")
  try:
    principal_result = api.create_principal(CreatePrincipalRequest(principal=principal))
    return principal_result
  except ApiException as e:
    if e.status == 409:
      return api.rotate_credentials(principal_name=principal_name)
    else:
      raise e

# Create a catalog role with the given name
def create_catalog_role(api, catalog, role_name):
  catalog_role = CatalogRole(name=role_name)
  try:
    api.create_catalog_role(catalog_name=catalog.name, create_catalog_role_request=CreateCatalogRoleRequest(catalog_role=catalog_role))
    return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
  except ApiException as e:
    return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
  else:
    raise e

# Create a principal role with the given name
def create_principal_role(api, role_name):
  principal_role = PrincipalRole(name=role_name)
  try:
    api.create_principal_role(CreatePrincipalRoleRequest(principal_role=principal_role))
    return api.get_principal_role(principal_role_name=role_name)
  except ApiException as e:
    return api.get_principal_role(principal_role_name=role_name)


# Create a new Principal, Principal Role, and Catalog Role
The new Principal belongs to the `engineer` principal role, which has `CATALOG_MANAGE_CONTENT` privileges on the `polaris_catalog`. 


`CATALOG_MANAGE_CONTENT` has create/list/read/write privileges on all entities within the catalog. The same privilege could be granted to a namespace, in which case, the engineers could create/list/read/write any entity under that namespace

In [4]:
# Create the engineer_principal
engineer_principal = create_principal(root_client, "principle")

# Create the principal role
engineer_role = create_principal_role(root_client, "engineer")

# Create the catalog role
manager_catalog_role = create_catalog_role(root_client, catalog, "manage_catalog")

# Grant the catalog role to the principal role
# All principals in the principal role have the catalog role's privileges
root_client.assign_catalog_role_to_principal_role(principal_role_name=engineer_role.name,
                                                  catalog_name=catalog.name,
                                                  grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=manager_catalog_role))

# Assign privileges to the catalog role
# Here, we grant CATALOG_MANAGE_CONTENT
root_client.add_grant_to_catalog_role(catalog.name, manager_catalog_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.CATALOG_MANAGE_CONTENT)))

# Assign the principal role to the principal
root_client.assign_principal_role(engineer_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=engineer_role))

# Create a reader Principal, Principal Role, and Catalog Role
This new principal belongs to the `product_manager` principal role, which is explicitly granted read and list permissions on the catalog.

Permissions cascade, so permissions granted at the catalog level are inherited by namespaces and tables within the catalog.

In [5]:
# Create a reader principal
reader_principal = create_principal(root_client, "mlee")

# Create the principal role
pm_role = create_principal_role(root_client, "product_manager")

# Create the catalog role
read_only_role = create_catalog_role(root_client, catalog, "read_only")

# Grant the catalog role to the principal role
root_client.assign_catalog_role_to_principal_role(principal_role_name=pm_role.name,
                                                  catalog_name=catalog.name,
                                                  grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=read_only_role))

# Assign privileges to the catalog role
# Here, the catalog role is granted READ and LIST privileges at the catalog level
# Privileges cascade down
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_LIST)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_READ_DATA)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.VIEW_LIST)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.VIEW_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.NAMESPACE_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.NAMESPACE_LIST)))

# Assign the principal role to the principal
root_client.assign_principal_role(reader_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=pm_role))

# Create a Spark session with the engineer credentials

* Catalog URI points to our Polaris installation
* Credential set using the client_id and client_secret generated for the principal
* Scope set to `PRINCIPAL_ROLE:ALL`
* `X-Iceberg-Access-Delegation` is set to vended-credentials

In [6]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
  .config("spark.jars", "../polaris_libs/polaris-iceberg-1.8.1-spark-runtime-3.5_2.12-0.11.0-beta-incubating-SNAPSHOT.jar")
  .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.2.1")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .config('spark.sql.iceberg.vectorization.enabled', 'false')

  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
  # Configure the 'polaris' catalog as an Iceberg rest catalog
  .config("spark.sql.catalog.polaris", "org.apache.polaris.spark.SparkCatalog")
  # Specify the rest catalog endpoint       
  .config("spark.sql.catalog.polaris.uri", "http://polaris:8181/api/catalog")
  # Enable token refresh
  .config("spark.sql.catalog.polaris.token-refresh-enabled", "true")
  # specify the client_id:client_secret pair
  .config("spark.sql.catalog.polaris.credential", f"{engineer_principal.credentials.client_id}:{engineer_principal.credentials.client_secret}")

  # Set the warehouse to the name of the catalog we created
  .config("spark.sql.catalog.polaris.warehouse", catalog_name)

  # Scope set to PRINCIPAL_ROLE:ALL
  .config("spark.sql.catalog.polaris.scope", 'PRINCIPAL_ROLE:ALL')

  # Enable access credential delegation
  .config("spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation", 'vended-credentials')

  # AWS configuration
  .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

  .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.io.ResolvingFileIO")
  .config("spark.sql.catalog.polaris.s3.region", "us-west-2")
  .config("spark.history.fs.logDirectory", "/home/iceberg/spark-events")).getOrCreate()


# USE polaris
Tell Spark to use the Polaris catalog

In [7]:
spark.sql("USE polaris")
spark.sql("SHOW NAMESPACES").show()

+---------+
|namespace|
+---------+
+---------+



# Create Nested Namespaces

In [8]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS DELTA_NS")
spark.sql("CREATE NAMESPACE IF NOT EXISTS DELTA_NS.PUBLIC")
spark.sql("SHOW NAMESPACES IN DELTA_NS").show()

+---------------+
|      namespace|
+---------------+
|DELTA_NS.PUBLIC|
+---------------+



# Create a Delta Table

In [10]:
spark.sql("USE NAMESPACE DELTA_NS.PUBLIC")
spark.sql("""CREATE TABLE IF NOT EXISTS PEOPLE (
    id int, name string)
USING delta LOCATION 'file:///tmp/delta_tables/people';
""")
# You can also use cloud storage like s3. For eample: s3://<bucket-name>/<path-to-table>
# Make the corresponding credentials are set up correctly

DataFrame[]

In [11]:
spark.sql("SHOW TABLES").show()

+---------------+---------+-----------+
|      namespace|tableName|isTemporary|
+---------------+---------+-----------+
|DELTA_NS.PUBLIC|   PEOPLE|      false|
+---------------+---------+-----------+



# It's Empty

In [12]:
spark.sql("SELECT * FROM PEOPLE").show()

+---+----+
| id|name|
+---+----+
+---+----+



# Insert some records
Querying again shows some records

In [13]:
spark.sql("INSERT INTO PEOPLE VALUES (1, 'anna'), (2, 'bob'), (3, 'jonath')")
spark.sql("SELECT * FROM PEOPLE").show()

+---+------+
| id|  name|
+---+------+
|  3|jonath|
|  1|  anna|
|  2|   bob|
+---+------+



# Create Iceberg Table

In [14]:
spark.sql("USE NAMESPACE DELTA_NS.PUBLIC")
spark.sql("""CREATE TABLE IF NOT EXISTS COUNTRY (code string, name string) USING iceberg""")

DataFrame[]

In [15]:
spark.sql("SHOW TABLES").show()

+---------------+---------+-----------+
|      namespace|tableName|isTemporary|
+---------------+---------+-----------+
|DELTA_NS.PUBLIC|  COUNTRY|      false|
|DELTA_NS.PUBLIC|   PEOPLE|      false|
+---------------+---------+-----------+



# Insert values for the iceberg table

In [16]:
spark.sql("INSERT INTO COUNTRY VALUES ('US', 'United States'), ('CA', 'Canada'), ('FR', 'France'), ('IN', 'India')")
spark.sql("SELECT * FROM COUNTRY").show()

+----+-------------+
|code|         name|
+----+-------------+
|  US|United States|
|  CA|       Canada|
|  FR|       France|
|  IN|        India|
+----+-------------+



# Initiate a new Spark session
Change the credentials to the PM's read-only credentials

In [17]:
# The new spark session inherits everything from the previous session except for the overridden credentials
new_spark = spark.newSession()
new_spark.conf.set("spark.sql.catalog.polaris.credential", f"{reader_principal.credentials.client_id}:{reader_principal.credentials.client_secret}")
new_spark.sql("USE polaris")

DataFrame[]

# Show Namespace contents
We can still `USE NAMESPACE` and `SHOW TABLES`, which require `READ_NAMESPACE_PROPERTIES` and `LIST_TABLES` privileges respectively

In [18]:
new_spark.sql("USE NAMESPACE DELTA_NS.PUBLIC")
new_spark.sql("SHOW TABLES").show()

+---------------+---------+-----------+
|      namespace|tableName|isTemporary|
+---------------+---------+-----------+
|DELTA_NS.PUBLIC|  COUNTRY|      false|
|DELTA_NS.PUBLIC|   PEOPLE|      false|
+---------------+---------+-----------+



# Table reads work

In [19]:
new_spark.sql("SELECT * FROM COUNTRY").show()

+----+-------------+
|code|         name|
+----+-------------+
|  US|United States|
|  CA|       Canada|
|  FR|       France|
|  IN|        India|
+----+-------------+



# Drop table fails
NOTE: there is currently no write privilege support for non-iceberg tables

In [20]:
new_spark.sql("DROP TABLE COUNTRY")

# Drop table using original spark session

In [21]:
spark.sql("DROP TABLE COUNTRY")

DataFrame[]

In [22]:
spark.sql("SHOW TABLES").show()

+---------------+---------+-----------+
|      namespace|tableName|isTemporary|
+---------------+---------+-----------+
|DELTA_NS.PUBLIC|   PEOPLE|      false|
+---------------+---------+-----------+



In [23]:
new_spark.sql("SHOW TABLES").show()

+---------------+---------+-----------+
|      namespace|tableName|isTemporary|
+---------------+---------+-----------+
|DELTA_NS.PUBLIC|   PEOPLE|      false|
+---------------+---------+-----------+



In [24]:
spark.sql("DROP TABLE PEOPLE")

DataFrame[]

In [25]:
spark.sql("SHOW TABLES").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+

