clients/client-python/gravitino/client/gravitino_admin_client.py (76 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from typing import List, Dict
from gravitino.client.gravitino_client_base import GravitinoClientBase
from gravitino.client.gravitino_metalake import GravitinoMetalake
from gravitino.client.dto_converters import DTOConverters
from gravitino.dto.requests.metalake_create_request import MetalakeCreateRequest
from gravitino.dto.requests.metalake_set_request import MetalakeSetRequest
from gravitino.dto.requests.metalake_updates_request import MetalakeUpdatesRequest
from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.metalake_list_response import MetalakeListResponse
from gravitino.dto.responses.metalake_response import MetalakeResponse
from gravitino.api.metalake_change import MetalakeChange
from gravitino.exceptions.handlers.metalake_error_handler import METALAKE_ERROR_HANDLER
from gravitino.rest.rest_utils import encode_string
logger = logging.getLogger(__name__)
class GravitinoAdminClient(GravitinoClientBase):
"""
Gravitino Client for the administrator to interact with the Gravitino API.
It allows the client to list, load, create, and alter Metalakes.
Normal users should use {@link GravitinoClient} to connect with the Gravitino server.
"""
def list_metalakes(self) -> List[GravitinoMetalake]:
"""Retrieves a list of Metalakes from the Gravitino API.
Returns:
An array of GravitinoMetalake objects representing the Metalakes.
"""
resp = self._rest_client.get(
self.API_METALAKES_LIST_PATH, error_handler=METALAKE_ERROR_HANDLER
)
metalake_list_resp = MetalakeListResponse.from_json(
resp.body, infer_missing=True
)
metalake_list_resp.validate()
return [
GravitinoMetalake(o, self._rest_client)
for o in metalake_list_resp.metalakes()
]
def create_metalake(
self, name: str, comment: str, properties: Dict[str, str]
) -> GravitinoMetalake:
"""Creates a new Metalake using the Gravitino API.
Args:
name: The name of the new Metalake.
comment: The comment for the new Metalake.
properties: The properties of the new Metalake.
Returns:
A GravitinoMetalake instance representing the newly created Metalake.
TODO: @throws MetalakeAlreadyExistsException If a Metalake with the specified identifier already exists.
"""
req = MetalakeCreateRequest(name, comment, properties)
req.validate()
resp = self._rest_client.post(
self.API_METALAKES_LIST_PATH, req, error_handler=METALAKE_ERROR_HANDLER
)
metalake_response = MetalakeResponse.from_json(resp.body, infer_missing=True)
metalake_response.validate()
metalake = metalake_response.metalake()
return GravitinoMetalake(metalake, self._rest_client)
def alter_metalake(self, name: str, *changes: MetalakeChange) -> GravitinoMetalake:
"""Alters a specific Metalake using the Gravitino API.
Args:
name: The name of the Metalake to be altered.
changes: The changes to be applied to the Metalake.
Returns:
A GravitinoMetalake instance representing the updated Metalake.
TODO: @throws NoSuchMetalakeException If the specified Metalake does not exist.
TODO: @throws IllegalArgumentException If the provided changes are invalid or not applicable.
"""
reqs = [DTOConverters.to_metalake_update_request(change) for change in changes]
updates_request = MetalakeUpdatesRequest(reqs)
updates_request.validate()
resp = self._rest_client.put(
self.API_METALAKES_IDENTIFIER_PATH + encode_string(name),
updates_request,
error_handler=METALAKE_ERROR_HANDLER,
)
metalake_response = MetalakeResponse.from_json(resp.body, infer_missing=True)
metalake_response.validate()
metalake = metalake_response.metalake()
return GravitinoMetalake(metalake, self._rest_client)
def drop_metalake(self, name: str, force: bool = False) -> bool:
"""Drops a specific Metalake using the Gravitino API.
Args:
name: The name of the Metalake to be dropped.
force: Whether to force the drop operation.
Returns:
True if the Metalake was successfully dropped, false if the Metalake does not exist.
"""
params = {"force": str(force)}
resp = self._rest_client.delete(
self.API_METALAKES_IDENTIFIER_PATH + encode_string(name),
params=params,
error_handler=METALAKE_ERROR_HANDLER,
)
drop_response = DropResponse.from_json(resp.body, infer_missing=True)
return drop_response.dropped()
def enable_metalake(self, name: str):
"""Enable the metalake with specified name. If the metalake is already in use, this method does nothing.
Args:
name: the name of the metalake.
Raises:
NoSuchMetalakeException if the metalake with specified name does not exist.
"""
metalake_enable_request = MetalakeSetRequest(in_use=True)
metalake_enable_request.validate()
url = self.API_METALAKES_IDENTIFIER_PATH + encode_string(name)
self._rest_client.patch(
url, json=metalake_enable_request, error_handler=METALAKE_ERROR_HANDLER
)
def disable_metalake(self, name: str):
"""Disable the metalake with specified name. If the metalake is already disabled, does nothing.
Args:
name: the name of the metalake.
Raises:
NoSuchMetalakeException if the metalake with specified name does not exist.
"""
metalake_disable_request = MetalakeSetRequest(in_use=False)
metalake_disable_request.validate()
url = self.API_METALAKES_IDENTIFIER_PATH + encode_string(name)
self._rest_client.patch(
url, json=metalake_disable_request, error_handler=METALAKE_ERROR_HANDLER
)