client/python/cli/command/catalogs.py (174 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from dataclasses import dataclass, field
from typing import Dict, Optional, List
from pydantic import StrictStr
from cli.command import Command
from cli.constants import StorageType, CatalogType, Subcommands, Arguments
from cli.options.option_tree import Argument
from polaris.management import PolarisDefaultApi, Catalog, CreateCatalogRequest, UpdateCatalogRequest, \
StorageConfigInfo, ExternalCatalog, AwsStorageConfigInfo, AzureStorageConfigInfo, GcpStorageConfigInfo, \
PolarisCatalog, CatalogProperties
@dataclass
class CatalogsCommand(Command):
"""
A Command implementation to represent `polaris catalogs`. The instance attributes correspond to parameters
that can be provided to various subcommands, except `catalogs_subcommand` which represents the subcommand
itself.
Example commands:
* ./polaris catalogs create cat_name --storage-type s3 --default-base-location s3://bucket/path --role-arn ...
* ./polaris catalogs update cat_name --default-base-location s3://new-bucket/new-location
* ./polaris catalogs list
"""
catalogs_subcommand: str
catalog_type: str
remote_url: str
default_base_location: str
storage_type: str
allowed_locations: List[str]
role_arn: str
external_id: str
user_arn: str
region: str
tenant_id: str
multi_tenant_app_name: str
consent_url: str
service_account: str
catalog_name: str
properties: Dict[str, StrictStr]
set_properties: Dict[str, StrictStr]
remove_properties: List[str]
def validate(self):
if self.catalogs_subcommand == Subcommands.CREATE:
if not self.storage_type:
raise Exception(f'Missing required argument:'
f' {Argument.to_flag_name(Arguments.STORAGE_TYPE)}')
if not self.default_base_location:
raise Exception(f'Missing required argument:'
f' {Argument.to_flag_name(Arguments.DEFAULT_BASE_LOCATION)}')
if self.storage_type == StorageType.S3.value:
if not self.role_arn:
raise Exception(f"Missing required argument for storage type 's3':"
f" {Argument.to_flag_name(Arguments.ROLE_ARN)}")
if self._has_azure_storage_info() or self._has_gcs_storage_info():
raise Exception(f"Storage type 's3' supports the storage credentials"
f" {Argument.to_flag_name(Arguments.ROLE_ARN)},"
f" {Argument.to_flag_name(Arguments.REGION)},"
f" {Argument.to_flag_name(Arguments.EXTERNAL_ID)}, and"
f" {Argument.to_flag_name(Arguments.USER_ARN)}")
elif self.storage_type == StorageType.AZURE.value:
if not self.tenant_id:
raise Exception("Missing required argument for storage type 'azure': "
f" {Argument.to_flag_name(Arguments.TENANT_ID)}")
if self._has_aws_storage_info() or self._has_gcs_storage_info():
raise Exception("Storage type 'azure' supports the storage credentials"
f" {Argument.to_flag_name(Arguments.TENANT_ID)},"
f" {Argument.to_flag_name(Arguments.MULTI_TENANT_APP_NAME)}, and"
f" {Argument.to_flag_name(Arguments.CONSENT_URL)}")
elif self.storage_type == StorageType.GCS.value:
if self._has_aws_storage_info() or self._has_azure_storage_info():
raise Exception("Storage type 'gcs' supports the storage credential"
f" {Argument.to_flag_name(Arguments.SERVICE_ACCOUNT)}")
elif self.storage_type == StorageType.FILE.value:
if self._has_aws_storage_info() or self._has_azure_storage_info() or self._has_gcs_storage_info():
raise Exception("Storage type 'file' does not support any storage credentials")
def _has_aws_storage_info(self):
return self.role_arn or self.external_id or self.user_arn or self.region
def _has_azure_storage_info(self):
return self.tenant_id or self.multi_tenant_app_name or self.consent_url
def _has_gcs_storage_info(self):
return self.service_account
def _build_storage_config_info(self):
config = None
if self.storage_type == StorageType.S3.value:
config = AwsStorageConfigInfo(
storage_type=self.storage_type.upper(),
allowed_locations=self.allowed_locations,
role_arn=self.role_arn,
external_id=self.external_id,
user_arn=self.user_arn,
region=self.region
)
elif self.storage_type == StorageType.AZURE.value:
config = AzureStorageConfigInfo(
storage_type=self.storage_type.upper(),
allowed_locations=self.allowed_locations,
tenant_id=self.tenant_id,
multi_tenant_app_name=self.multi_tenant_app_name,
consent_url=self.consent_url,
)
elif self.storage_type == StorageType.GCS.value:
config = GcpStorageConfigInfo(
storage_type=self.storage_type.upper(),
allowed_locations=self.allowed_locations,
gcs_service_account=self.service_account
)
elif self.storage_type == StorageType.FILE.value:
config = StorageConfigInfo(
storage_type=self.storage_type.upper(),
allowed_locations=self.allowed_locations
)
return config
def execute(self, api: PolarisDefaultApi) -> None:
if self.catalogs_subcommand == Subcommands.CREATE:
config = self._build_storage_config_info()
if self.catalog_type == CatalogType.EXTERNAL.value:
request = CreateCatalogRequest(
catalog=ExternalCatalog(
type=self.catalog_type.upper(),
name=self.catalog_name,
storage_config_info=config,
remote_url=self.remote_url,
properties=CatalogProperties(
default_base_location=self.default_base_location,
additional_properties=self.properties
)
)
)
else:
request = CreateCatalogRequest(
catalog=PolarisCatalog(
type=self.catalog_type.upper(),
name=self.catalog_name,
storage_config_info=config,
properties=CatalogProperties(
default_base_location=self.default_base_location,
additional_properties=self.properties
)
)
)
api.create_catalog(request)
elif self.catalogs_subcommand == Subcommands.DELETE:
api.delete_catalog(self.catalog_name)
elif self.catalogs_subcommand == Subcommands.GET:
print(api.get_catalog(self.catalog_name).to_json())
elif self.catalogs_subcommand == Subcommands.LIST:
for catalog in api.list_catalogs().catalogs:
print(catalog.to_json())
elif self.catalogs_subcommand == Subcommands.UPDATE:
catalog = api.get_catalog(self.catalog_name)
if self.default_base_location or self.set_properties or self.remove_properties:
new_default_base_location = self.default_base_location or catalog.properties.default_base_location
new_additional_properties = catalog.properties.additional_properties or {}
# Add or update all entries specified in set_properties
if self.set_properties:
new_additional_properties = {**new_additional_properties, **self.set_properties}
# Remove all keys specified in remove_properties
if self.remove_properties:
for to_remove in self.remove_properties:
new_additional_properties.pop(to_remove, None)
catalog.properties = CatalogProperties(
default_base_location=new_default_base_location,
additional_properties=new_additional_properties
)
if (self._has_aws_storage_info() or self._has_azure_storage_info() or
self._has_gcs_storage_info() or self.allowed_locations):
# We must first reconstitute local storage-config related settings from the existing
# catalog to properly construct the complete updated storage-config
updated_storage_info = catalog.storage_config_info
# In order to apply mutations client-side, we can't just use the base
# _build_storage_config_info helper; instead, each allowed updatable field defined
# in option_tree.py should be applied individually against the existing
# storage_config_info here.
if self.allowed_locations:
updated_storage_info.allowed_locations.extend(self.allowed_locations)
if self.region:
# Note: We have to lowercase the returned value because the server enum
# is uppercase but we defined the StorageType enums as lowercase.
storage_type = updated_storage_info.storage_type
if storage_type.lower() != StorageType.S3.value:
raise Exception(
f'--region requires S3 storage_type, got: {storage_type}')
updated_storage_info.region = self.region
request = UpdateCatalogRequest(
current_entity_version=catalog.entity_version,
properties=catalog.properties.to_dict(),
storage_config_info=updated_storage_info
)
else:
request = UpdateCatalogRequest(
current_entity_version=catalog.entity_version,
properties=catalog.properties.to_dict()
)
api.update_catalog(self.catalog_name, request)
else:
raise Exception(f'{self.catalogs_subcommand} is not supported in the CLI')