clients/client-python/gravitino/client/gravitino_metalake.py (122 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from typing import List, Dict
from gravitino.api.catalog import Catalog
from gravitino.api.catalog_change import CatalogChange
from gravitino.client.dto_converters import DTOConverters
from gravitino.dto.metalake_dto import MetalakeDTO
from gravitino.dto.requests.catalog_create_request import CatalogCreateRequest
from gravitino.dto.requests.catalog_set_request import CatalogSetRequest
from gravitino.dto.requests.catalog_updates_request import CatalogUpdatesRequest
from gravitino.dto.responses.catalog_list_response import CatalogListResponse
from gravitino.dto.responses.catalog_response import CatalogResponse
from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.entity_list_response import EntityListResponse
from gravitino.exceptions.handlers.catalog_error_handler import CATALOG_ERROR_HANDLER
from gravitino.rest.rest_utils import encode_string
from gravitino.utils import HTTPClient
logger = logging.getLogger(__name__)
class GravitinoMetalake(MetalakeDTO):
"""
Gravitino Metalake is the top-level metadata repository for users. It contains a list of catalogs
as sub-level metadata collections. With GravitinoMetalake, users can list, create, load,
alter and drop a catalog with specified identifier.
"""
rest_client: HTTPClient
API_METALAKES_CATALOGS_PATH = "api/metalakes/{}/catalogs/{}"
def __init__(self, metalake: MetalakeDTO = None, client: HTTPClient = None):
super().__init__(
_name=metalake.name(),
_comment=metalake.comment(),
_properties=metalake.properties(),
_audit=metalake.audit_info(),
)
self.rest_client = client
def list_catalogs(self) -> List[str]:
"""List all the catalogs under this metalake.
Raises:
NoSuchMetalakeException if the metalake with specified namespace does not exist.
Returns:
A list of the catalog names under this metalake.
"""
url = f"api/metalakes/{self.name()}/catalogs"
response = self.rest_client.get(url, error_handler=CATALOG_ERROR_HANDLER)
entity_list = EntityListResponse.from_json(response.body, infer_missing=True)
entity_list.validate()
return [identifier.name() for identifier in entity_list.identifiers()]
def list_catalogs_info(self) -> List[Catalog]:
"""List all the catalogs with their information under this metalake.
Raises:
NoSuchMetalakeException if the metalake with specified namespace does not exist.
Returns:
A list of Catalog under the specified namespace.
"""
params = {"details": "true"}
url = f"api/metalakes/{self.name()}/catalogs"
response = self.rest_client.get(
url, params=params, error_handler=CATALOG_ERROR_HANDLER
)
catalog_list = CatalogListResponse.from_json(response.body, infer_missing=True)
return [
DTOConverters.to_catalog(self.name(), catalog, self.rest_client)
for catalog in catalog_list.catalogs()
]
def load_catalog(self, name: str) -> Catalog:
"""Load the catalog with specified name.
Args:
name: The name of the catalog to load.
Raises:
NoSuchCatalogException if the catalog with specified name does not exist.
Returns:
The Catalog with specified name.
"""
url = self.API_METALAKES_CATALOGS_PATH.format(
encode_string(self.name()), encode_string(name)
)
response = self.rest_client.get(url, error_handler=CATALOG_ERROR_HANDLER)
catalog_resp = CatalogResponse.from_json(response.body, infer_missing=True)
return DTOConverters.to_catalog(
self.name(), catalog_resp.catalog(), self.rest_client
)
def create_catalog(
self,
name: str,
catalog_type: Catalog.Type,
provider: str,
comment: str,
properties: Dict[str, str],
) -> Catalog:
"""Create a new catalog with specified name, catalog type, comment and properties.
Args:
name: The name of the catalog.
catalog_type: The type of the catalog.
provider: The provider of the catalog. This parameter can be None if the catalog
provides a managed implementation. Currently, only the model catalog supports None
provider. For the details, please refer to the Catalog.Type.
comment: The comment of the catalog.
properties: The properties of the catalog.
Raises:
NoSuchMetalakeException if the metalake does not exist.
CatalogAlreadyExistsException if the catalog with specified name already exists.
Returns:
The created Catalog.
"""
catalog_create_request = CatalogCreateRequest(
name=name,
catalog_type=catalog_type,
provider=provider,
comment=comment,
properties=properties,
)
catalog_create_request.validate()
url = f"api/metalakes/{encode_string(self.name())}/catalogs"
response = self.rest_client.post(
url, json=catalog_create_request, error_handler=CATALOG_ERROR_HANDLER
)
catalog_resp = CatalogResponse.from_json(response.body, infer_missing=True)
return DTOConverters.to_catalog(
self.name(), catalog_resp.catalog(), self.rest_client
)
def alter_catalog(self, name: str, *changes: CatalogChange) -> Catalog:
"""Alter the catalog with specified name by applying the changes.
Args:
name: the name of the catalog.
changes: the changes to apply to the catalog.
Raises:
NoSuchCatalogException if the catalog with specified name does not exist.
IllegalArgumentException if the changes are invalid.
Returns:
the altered Catalog.
"""
reqs = [DTOConverters.to_catalog_update_request(change) for change in changes]
updates_request = CatalogUpdatesRequest(reqs)
updates_request.validate()
url = self.API_METALAKES_CATALOGS_PATH.format(
encode_string(self.name()), encode_string(name)
)
response = self.rest_client.put(
url, json=updates_request, error_handler=CATALOG_ERROR_HANDLER
)
catalog_response = CatalogResponse.from_json(response.body, infer_missing=True)
catalog_response.validate()
return DTOConverters.to_catalog(
self.name(), catalog_response.catalog(), self.rest_client
)
def drop_catalog(self, name: str, force: bool = False) -> bool:
"""Drop the catalog with specified name.
Args:
name: the name of the catalog.
force: whether to force drop the catalog.
Returns:
true if the catalog is dropped successfully, false if the catalog does not exist.
"""
params = {"force": str(force)}
url = self.API_METALAKES_CATALOGS_PATH.format(
encode_string(self.name()), encode_string(name)
)
response = self.rest_client.delete(
url, params=params, error_handler=CATALOG_ERROR_HANDLER
)
drop_response = DropResponse.from_json(response.body, infer_missing=True)
drop_response.validate()
return drop_response.dropped()
def enable_catalog(self, name: str):
"""Enable the catalog with specified name. If the catalog is already in use, this method does nothing.
Args:
name: the name of the catalog.
Raises:
NoSuchCatalogException if the catalog with specified name does not exist.
"""
catalog_enable_request = CatalogSetRequest(in_use=True)
catalog_enable_request.validate()
url = self.API_METALAKES_CATALOGS_PATH.format(
encode_string(self.name()), encode_string(name)
)
self.rest_client.patch(
url, json=catalog_enable_request, error_handler=CATALOG_ERROR_HANDLER
)
def disable_catalog(self, name: str):
"""Disable the catalog with specified name. If the catalog is already disabled, this method does nothing.
Args:
name: the name of the catalog.
Raises:
NoSuchCatalogException if the catalog with specified name does not exist.
"""
catalog_disable_request = CatalogSetRequest(in_use=False)
catalog_disable_request.validate()
url = self.API_METALAKES_CATALOGS_PATH.format(
encode_string(self.name()), encode_string(name)
)
self.rest_client.patch(
url, json=catalog_disable_request, error_handler=CATALOG_ERROR_HANDLER
)