# Bootstrap the client with ROOT credentials
Using the python client generated from our OpenAPI spec, we generate a token from our root user's credentials

In [None]:
from polaris.catalog.api.iceberg_catalog_api import IcebergCatalogAPI
from polaris.catalog.api.iceberg_o_auth2_api import IcebergOAuth2API
from polaris.catalog.api_client import ApiClient as CatalogApiClient
from polaris.catalog.api_client import Configuration as CatalogApiClientConfiguration

polaris_credential = 'root:s3cr3t' # pragma: allowlist secret

client_id, client_secret = polaris_credential.split(":")
client = CatalogApiClient(CatalogApiClientConfiguration(username=client_id,
                                 password=client_secret,
                                 host='http://polaris:8181/api/catalog'))

oauth_api = IcebergOAuth2API(client)
token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL',
                            client_id=client_id,
                          client_secret=client_secret,
                          grant_type='client_credentials',
                          _headers={'realm': 'default-realm'})


# Create our first catalog

* Creates a catalog named `polaris_catalog` that writes to a specified location in the Local Filesystem.

In [None]:
from polaris.management import *

client = ApiClient(Configuration(access_token=token.access_token,
                                   host='http://polaris:8181/api/management/v1'))
root_client = PolarisDefaultApi(client)

storage_conf = FileStorageConfigInfo(storage_type="FILE", allowed_locations=["file:///tmp"])
catalog_name = 'polaris_demo'
catalog = Catalog(name=catalog_name, type='INTERNAL', properties={"default-base-location": "file:///tmp/polaris/"},
                storage_config_info=storage_conf)
catalog.storage_config_info = storage_conf
root_client.create_catalog(create_catalog_request=CreateCatalogRequest(catalog=catalog))
resp = root_client.get_catalog(catalog_name=catalog.name)
resp

# Utility Functions

In [None]:
# Creates a principal with the given name
def create_principal(api, principal_name):
  principal = Principal(name=principal_name, type="SERVICE")
  try:
    principal_result = api.create_principal(CreatePrincipalRequest(principal=principal))
    return principal_result
  except ApiException as e:
    if e.status == 409:
      return api.rotate_credentials(principal_name=principal_name)
    else:
      raise e

# Create a catalog role with the given name
def create_catalog_role(api, catalog, role_name):
  catalog_role = CatalogRole(name=role_name)
  try:
    api.create_catalog_role(catalog_name=catalog.name, create_catalog_role_request=CreateCatalogRoleRequest(catalog_role=catalog_role))
    return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
  except ApiException as e:
    return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
  else:
    raise e

# Create a principal role with the given name
def create_principal_role(api, role_name):
  principal_role = PrincipalRole(name=role_name)
  try:
    api.create_principal_role(CreatePrincipalRoleRequest(principal_role=principal_role))
    return api.get_principal_role(principal_role_name=role_name)
  except ApiException as e:
    return api.get_principal_role(principal_role_name=role_name)


# Create a new Principal, Principal Role, and Catalog Role
The new Principal belongs to the `engineer` principal role, which has `CATALOG_MANAGE_CONTENT` privileges on the `polaris_catalog`. 


`CATALOG_MANAGE_CONTENT` has create/list/read/write privileges on all entities within the catalog. The same privilege could be granted to a namespace, in which case, the engineers could create/list/read/write any entity under that namespace

In [None]:
# Create the engineer_principal
engineer_principal = create_principal(root_client, "collado")

# Create the principal role
engineer_role = create_principal_role(root_client, "engineer")

# Create the catalog role
manager_catalog_role = create_catalog_role(root_client, catalog, "manage_catalog")

# Grant the catalog role to the principal role
# All principals in the principal role have the catalog role's privileges
root_client.assign_catalog_role_to_principal_role(principal_role_name=engineer_role.name,
                                                  catalog_name=catalog.name,
                                                  grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=manager_catalog_role))

# Assign privileges to the catalog role
# Here, we grant CATALOG_MANAGE_CONTENT
root_client.add_grant_to_catalog_role(catalog.name, manager_catalog_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.CATALOG_MANAGE_CONTENT)))

# Assign the principal role to the principal
root_client.assign_principal_role(engineer_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=engineer_role))

# Create a reader Principal, Principal Role, and Catalog Role
This new principal belongs to the `product_manager` principal role, which is explicitly granted read and list permissions on the catalog.

Permissions cascade, so permissions granted at the catalog level are inherited by namespaces and tables within the catalog.

In [None]:
# Create a reader principal
reader_principal = create_principal(root_client, "mlee")

# Create the principal role
pm_role = create_principal_role(root_client, "product_manager")

# Create the catalog role
read_only_role = create_catalog_role(root_client, catalog, "read_only")

# Grant the catalog role to the principal role
root_client.assign_catalog_role_to_principal_role(principal_role_name=pm_role.name,
                                                  catalog_name=catalog.name,
                                                  grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=read_only_role))

# Assign privileges to the catalog role
# Here, the catalog role is granted READ and LIST privileges at the catalog level
# Privileges cascade down
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_LIST)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_READ_DATA)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.VIEW_LIST)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.VIEW_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.NAMESPACE_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.NAMESPACE_LIST)))

# Assign the principal role to the principal
root_client.assign_principal_role(reader_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=pm_role))

# Create a Spark session with the engineer credentials

* Catalog URI points to our Polaris installation
* Credential set using the client_id and client_secret generated for the principal
* Scope set to `PRINCIPAL_ROLE:ALL`
* `X-Iceberg-Access-Delegation` is set to vended-credentials

In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
  .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
  .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.0,org.apache.hadoop:hadoop-aws:3.4.0,software.amazon.awssdk:bundle:2.23.19,software.amazon.awssdk:url-connection-client:2.23.19")
  .config('spark.sql.iceberg.vectorization.enabled', 'false')
         
  # Configure the 'polaris' catalog as an Iceberg rest catalog
  .config("spark.sql.catalog.polaris.type", "rest")
  .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog")
  # Specify the rest catalog endpoint       
  .config("spark.sql.catalog.polaris.uri", "http://polaris:8181/api/catalog")
  # Enable token refresh
  .config("spark.sql.catalog.polaris.token-refresh-enabled", "true")
  # specify the client_id:client_secret pair
  .config("spark.sql.catalog.polaris.credential", f"{engineer_principal.credentials.client_id}:{engineer_principal.credentials.client_secret}")

  # Set the warehouse to the name of the catalog we created
  .config("spark.sql.catalog.polaris.warehouse", catalog_name)

  # Scope set to PRINCIPAL_ROLE:ALL
  .config("spark.sql.catalog.polaris.scope", 'PRINCIPAL_ROLE:ALL')

  # Enable access credential delegation
  .config("spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation", 'vended-credentials')

  .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.io.ResolvingFileIO")
  .config("spark.sql.catalog.polaris.s3.region", "us-west-2")
  .config("spark.history.fs.logDirectory", "/home/iceberg/spark-events")).getOrCreate()


# USE polaris
Tell Spark to use the Polaris catalog

In [None]:
spark.sql("USE polaris")
spark.sql("SHOW NAMESPACES").show()

# Create Nested Namespaces

In [None]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS COLLADO_TEST")
spark.sql("CREATE NAMESPACE IF NOT EXISTS COLLADO_TEST.PUBLIC")
spark.sql("SHOW NAMESPACES IN COLLADO_TEST").show()

# Create a table

In [None]:
spark.sql("USE NAMESPACE COLLADO_TEST.PUBLIC")
spark.sql("""CREATE TABLE IF NOT EXISTS TEST_TABLE (
    id bigint NOT NULL COMMENT 'unique id',
    data string)
USING iceberg;
""")

# It's Empty

In [None]:
spark.sql("SELECT * FROM TEST_TABLE").show()

# Insert some records
Querying again shows some records

In [None]:
spark.sql("INSERT INTO TEST_TABLE VALUES (1, 'some data'), (2, 'more data'), (3, 'yet more data')")
spark.sql("SELECT * FROM TEST_TABLE").show()

# Use the Catalog API client
Create a new client using the engineer credentials

In [None]:
# Create a client to fetch an API token - use our client_id and client_secret as the username/password
token_client = CatalogApiClient(CatalogApiClientConfiguration(username=engineer_principal.credentials.client_id,
                                 password=engineer_principal.credentials.client_secret,
                                 host='http://polaris:8181/api/catalog'))

# Use the client to get the token from the /tokens endpoint
collado_token = IcebergOAuth2API(token_client).get_token(scope='PRINCIPAL_ROLE:ALL',
                            client_id=engineer_principal.credentials.client_id,
                          client_secret=engineer_principal.credentials.client_secret,
                          grant_type='client_credentials',
                          _headers={'realm': 'default-realm'})

# Now create a catalog client that uses the token in its Authentication header
client = CatalogApiClient(CatalogApiClientConfiguration(access_token=collado_token.access_token,
              host='http://polaris:8181/api/catalog'))
collado_client = IcebergCatalogAPI(client)


# Directly call the LoadTable API
This is the response returned to Spark. In addition to the expected metadataLocation, snaphshot, and schema data, we also see S3 credentials in the `config` field

In [None]:
import codecs
import json
from IPython.display import display, JSON

def format_namespace(namespace):
  return codecs.decode("1F", "hex").decode("UTF-8").join(namespace)

# Call loadTable
tbl_meta = collado_client.load_table(prefix=catalog_name, namespace=format_namespace(['COLLADO_TEST', 'PUBLIC']), table='TEST_TABLE', x_iceberg_access_delegation='true')
display(JSON(tbl_meta.to_dict(), expanded=True))

# Initiate a new Spark session
Change the credentials to the PM's read-only credentials

In [None]:
# The new spark session inherits everything from the previous session except for the overridden credentials
new_spark = spark.newSession()
new_spark.conf.set("spark.sql.catalog.polaris.credential", f"{reader_principal.credentials.client_id}:{reader_principal.credentials.client_secret}")
new_spark.sql("USE polaris")

# Show Namespace contents
We can still `USE NAMESPACE` and `SHOW TABLES`, which require `READ_NAMESPACE_PROPERTIES` and `LIST_TABLES` privileges respectively

In [None]:
new_spark.sql("USE NAMESPACE COLLADO_TEST.PUBLIC")
new_spark.sql("SHOW TABLES").show()

# Table reads work

In [None]:
new_spark.sql("SELECT * FROM TEST_TABLE").show()

# Insert attempts will fail

In [None]:
new_spark.sql("INSERT INTO TEST_TABLE VALUES (4, 'you cannot see this data'), (5, 'it will never be inserted'), (6, 'sad emoji')")

# Create an API client using reader credentials

In [None]:
# Create a client to fetch an API token - use the reader's client_id and client_secret as the username/password
token_client = CatalogApiClient(CatalogApiClientConfiguration(username=reader_principal.credentials.client_id,
                                 password=reader_principal.credentials.client_secret,
                                 host='http://polaris:8181/api/catalog'))

# Get the token
pm_token = IcebergOAuth2API(token_client).get_token(scope='PRINCIPAL_ROLE:ALL',
                            client_id=reader_principal.credentials.client_id,
                          client_secret=reader_principal.credentials.client_secret,
                          grant_type='client_credentials',
                          _headers={'realm': 'default-realm'})

# Now create a catalog client that uses the token in its Authentication header
pm_client = IcebergCatalogAPI(CatalogApiClient(CatalogApiClientConfiguration(access_token=pm_token.access_token,
              host='http://polaris:8181/api/catalog')))


# LoadTable returns a similar response
However, the S3 credentials are scoped to read-only

In [None]:
tbl_meta = pm_client.load_table(prefix=catalog_name, namespace=format_namespace(['COLLADO_TEST', 'PUBLIC']), table='TEST_TABLE', x_iceberg_access_delegation='true')
display(JSON(tbl_meta.to_dict(), expanded=True))

# Metadata manipulation is blocked by Polaris
PMs are always dropping tables in prod

In [None]:
pm_client.drop_table(prefix=catalog_name, namespace=format_namespace(['COLLADO_TEST', 'PUBLIC']), table='TEST_TABLE')

# Add another Principal Role to the Engineer Principal
A principal can belong to multiple Principal Roles. Typically, a call will use the union of all privilages assigned to all of the principal's roles. 

In [None]:
# Create a new principal role
ops_role = create_principal_role(root_client, "ops_engineer")

# Grant the read_only catalog role to the new principal role
root_client.assign_catalog_role_to_principal_role(principal_role_name=ops_role.name,
                                                  catalog_name=catalog.name,
                                                  grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=read_only_role))

# Assign the engineer principal to the new role
# The engineer principal now belongs to _both_ roles
root_client.assign_principal_role(engineer_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=ops_role))

# Scope the spark session to a single role
In this case, the Spark session is down-scoped to only the role specified. Even though the engineer has read-write privileges, the session only has privileges assigned to the specified Principal Role - in this case, the `read_only` catalog role.

In [None]:
ro_spark = spark.newSession()
ro_spark.conf.set("spark.sql.catalog.polaris.scope", 'PRINCIPAL_ROLE:ops_engineer')
ro_spark.sql("USE polaris")
ro_spark.sql("USE NAMESPACE COLLADO_TEST.PUBLIC")
ro_spark.sql("SHOW TABLES").show()

# The engineer can still read data

In [None]:
ro_spark.sql("SELECT * FROM TEST_TABLE").show()

# But inserts fail

In [None]:
ro_spark.sql("INSERT INTO TEST_TABLE VALUES (4, 'you cannot see this data'), (5, 'it will never be inserted'), (6, 'sad emoji')")

# And metadata operations are prohibited
Oops - I didn't mean to drop the _production_ table!

In [None]:
# create a token client with the _engineer's_ credentials
token_client = CatalogApiClient(CatalogApiClientConfiguration(username=engineer_principal.credentials.client_id,
                                 password=engineer_principal.credentials.client_secret,
                                 host='http://polaris:8181/api/catalog'))

# specify the role I want to activate - only ops_engineer
ops_token = IcebergOAuth2API(token_client).get_token(scope='PRINCIPAL_ROLE:ops_engineer',
                            client_id=engineer_principal.credentials.client_id,
                          client_secret=engineer_principal.credentials.client_secret,
                          grant_type='client_credentials',
                          _headers={'realm': 'default-realm'})

# The returned token is scoped to _only_ the privileges granted to the ops_engineer role
# The ops_client fails to do any real damage even though the engineer normally has DROP_TABLE privileges
ops_client = IcebergCatalogAPI(CatalogApiClient(CatalogApiClientConfiguration(access_token=ops_token.access_token,
              host='http://polaris:8181/api/catalog')))
ops_client.drop_table(prefix=catalog_name, namespace=format_namespace(['COLLADO_TEST', 'PUBLIC']), table='TEST_TABLE')