In [None]:
pip install hdfs

In [None]:
from hdfs import InsecureClient
import os

# Create a HDFS connector client
hdfs_client = InsecureClient("http://hive:50070", user='root')

# List HDFS file and directories
print(hdfs_client.list('/user/gravitino'))

hdfs_client.delete("/user/gravitino")

In [None]:
pip install apache-gravitino==0.8.0

In [None]:
from typing import Dict, List
from gravitino import NameIdentifier, GravitinoAdminClient, GravitinoClient, Catalog, Fileset, FilesetChange
import os 

# Create Gravitino admin client
gravitino_admin_client = GravitinoAdminClient(uri="http://gravitino:8090")

# Create metalake via Gravitino admin client
metalake_name="default"
metalake = gravitino_admin_client.create_metalake(name=metalake_name,
                                                  comment="metalake comment", 
                                                  properties={})
print(metalake)

In [None]:
# Create Gravitino client
gravitino_client = GravitinoClient(uri="http://gravitino:8090", metalake_name=metalake_name)

In [None]:
from typing import Dict, List
from gravitino import GravitinoMetalake

# List all Gravitino metalake entity
metalake_list: List[GravitinoMetalake] = gravitino_admin_client.list_metalakes()
print(metalake_list)

In [None]:
# Create catalog via Gravition client
catalog_name="catalog"

catalog = gravitino_client.create_catalog(name=catalog_name,
                                          catalog_type=Catalog.Type.FILESET,
                                          provider="hadoop", 
                                          comment="",
                                          properties={})
print(catalog)

In [None]:
# Load catalog entity via Gravition client
catalog = gravitino_client.load_catalog(name=catalog_name)
print(catalog)

In [None]:
# Create schema entity via Gravition client
schema_name="schema"
schema_path="/user/gravitino/"+schema_name
schema_hdfs_path=f"hdfs://hive:9000{schema_path}"

catalog.as_schemas().create_schema(schema_name=schema_name, 
                                   comment="", 
                                   properties={"location":schema_hdfs_path})

# Check if the schema location was successfully created in HDFS
try:
    info = hdfs_client.status(schema_path)
    print(f"Success: The storage location {schema_path} was successfully created.")
    print("Details:", info)
except Exception:
    print(f"Faild: The storage location {schema_path} was not successfully created.")

In [None]:

# Create a managed type of Fileset
managed_fileset_name="managed_fileset"
managed_fileset_path="/user/gravitino/"+schema_name+"/"+managed_fileset_name
managed_fileset_hdfs_path=f"hdfs://hive:9000{managed_fileset_path}"

managed_fileset_ident: NameIdentifier = NameIdentifier.of(schema_name, managed_fileset_name)
catalog.as_fileset_catalog().create_fileset(ident=managed_fileset_ident,
                                            fileset_type=Fileset.Type.MANAGED,
                                            comment="",
                                            storage_location=managed_fileset_hdfs_path,
                                            properties={})

# Check if the fileset location was successfully created in HDFS
try:
    info = hdfs_client.status(managed_fileset_path)
    print(f"Success: The storage location {managed_fileset_path} was successfully created.")
    print("Details:", info)  # print hdfs path detail informations
except Exception:
    print(f"Faild: The storage location {managed_fileset_path} was not successfully created.")

In [None]:
external_fileset_name="external_fileset"
external_fileset_path="/user/gravitino/"+schema_name+"/"+external_fileset_name
external_fileset_hdfs_path=f"hdfs://hive:9000{external_fileset_path}"

# Create a fileset path in HDFS in advance
hdfs_client.makedirs(external_fileset_path)
try:
    info = hdfs_client.status(external_fileset_path)
    print(f"Success: The storage location {external_fileset_path} was successfully created.")
    print("Details:", info)  # print hdfs path detail information
except Exception:
    print(f"Faild: The storage location {external_fileset_path} was not successfully created.")

# Create an external type of fileset
external_fileset_ident: NameIdentifier = NameIdentifier.of(schema_name, external_fileset_name)
catalog.as_fileset_catalog().create_fileset(ident=external_fileset_ident,
                                            fileset_type=Fileset.Type.EXTERNAL,
                                            comment="",
                                            storage_location=external_fileset_hdfs_path,
                                            properties={})

In [None]:
# List all fileset
catalog = gravitino_client.load_catalog(name=catalog_name)
fileset_list: List[NameIdentifier] = catalog.as_fileset_catalog().list_filesets(namespace=managed_fileset_ident.namespace())
print(fileset_list)

In [None]:
# Load managed fileset
managed_fileset=gravitino_client.load_catalog(name=catalog_name).as_fileset_catalog().load_fileset(ident=managed_fileset_ident)
print(managed_fileset)

In [None]:
# Load external fileset
external_fileset=gravitino_client.load_catalog(name=catalog_name).as_fileset_catalog().load_fileset(ident=external_fileset_ident)
print(external_fileset)

In [None]:
# Drop managed type of fileset and deleted HDFS location
catalog.as_fileset_catalog().drop_fileset(ident=managed_fileset_ident)

# Check managed type of fileset location if successfully deleted
try:
    info = hdfs_client.status(managed_fileset_path)
    print(f"Faild: The storage location {managed_fileset_path} was not successfully deleted.")
except Exception:
    print(f"Success: The storage location {managed_fileset_path} was successfully deleted.")

In [None]:
# Drop external type of fileset, Should not be deleted HDFS location
catalog.as_fileset_catalog().drop_fileset(ident=external_fileset_ident)

# Check managed type of fileset location if reserved
try:
    info = hdfs_client.status(external_fileset_path)
    print(f"Success: The storage location {external_fileset_path} reserved.")
except Exception:
    print(f"Faild: The storage location {external_fileset_path} deleted.")

In [None]:
# Drop schema
catalog.as_schemas().drop_schema(schema_name=schema_name, cascade=True)

# Check schema location if successfully deleted
try:
    info = hdfs_client.status(schema_path)
    print(f"Faild: The storage location {schema_path} was not successfully deleted.")
except Exception:
    print(f"Success: The storage location {schema_path} was successfully deleted.")

In [None]:
# Drop catalog
result=gravitino_client.drop_catalog(name=catalog_name, force=True)
print(result)

In [None]:
# Drop metalake
result=gravitino_admin_client.drop_metalake(metalake_name, force=True)
print(result)