client/python/cli/command/catalogs.py (174 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from dataclasses import dataclass, field from typing import Dict, Optional, List from pydantic import StrictStr from cli.command import Command from cli.constants import StorageType, CatalogType, Subcommands, Arguments from cli.options.option_tree import Argument from polaris.management import PolarisDefaultApi, Catalog, CreateCatalogRequest, UpdateCatalogRequest, \ StorageConfigInfo, ExternalCatalog, AwsStorageConfigInfo, AzureStorageConfigInfo, GcpStorageConfigInfo, \ PolarisCatalog, CatalogProperties @dataclass class CatalogsCommand(Command): """ A Command implementation to represent `polaris catalogs`. The instance attributes correspond to parameters that can be provided to various subcommands, except `catalogs_subcommand` which represents the subcommand itself. Example commands: * ./polaris catalogs create cat_name --storage-type s3 --default-base-location s3://bucket/path --role-arn ... * ./polaris catalogs update cat_name --default-base-location s3://new-bucket/new-location * ./polaris catalogs list """ catalogs_subcommand: str catalog_type: str remote_url: str default_base_location: str storage_type: str allowed_locations: List[str] role_arn: str external_id: str user_arn: str region: str tenant_id: str multi_tenant_app_name: str consent_url: str service_account: str catalog_name: str properties: Dict[str, StrictStr] set_properties: Dict[str, StrictStr] remove_properties: List[str] def validate(self): if self.catalogs_subcommand == Subcommands.CREATE: if not self.storage_type: raise Exception(f'Missing required argument:' f' {Argument.to_flag_name(Arguments.STORAGE_TYPE)}') if not self.default_base_location: raise Exception(f'Missing required argument:' f' {Argument.to_flag_name(Arguments.DEFAULT_BASE_LOCATION)}') if self.storage_type == StorageType.S3.value: if not self.role_arn: raise Exception(f"Missing required argument for storage type 's3':" f" {Argument.to_flag_name(Arguments.ROLE_ARN)}") if self._has_azure_storage_info() or self._has_gcs_storage_info(): raise Exception(f"Storage type 's3' supports the storage credentials" f" {Argument.to_flag_name(Arguments.ROLE_ARN)}," f" {Argument.to_flag_name(Arguments.REGION)}," f" {Argument.to_flag_name(Arguments.EXTERNAL_ID)}, and" f" {Argument.to_flag_name(Arguments.USER_ARN)}") elif self.storage_type == StorageType.AZURE.value: if not self.tenant_id: raise Exception("Missing required argument for storage type 'azure': " f" {Argument.to_flag_name(Arguments.TENANT_ID)}") if self._has_aws_storage_info() or self._has_gcs_storage_info(): raise Exception("Storage type 'azure' supports the storage credentials" f" {Argument.to_flag_name(Arguments.TENANT_ID)}," f" {Argument.to_flag_name(Arguments.MULTI_TENANT_APP_NAME)}, and" f" {Argument.to_flag_name(Arguments.CONSENT_URL)}") elif self.storage_type == StorageType.GCS.value: if self._has_aws_storage_info() or self._has_azure_storage_info(): raise Exception("Storage type 'gcs' supports the storage credential" f" {Argument.to_flag_name(Arguments.SERVICE_ACCOUNT)}") elif self.storage_type == StorageType.FILE.value: if self._has_aws_storage_info() or self._has_azure_storage_info() or self._has_gcs_storage_info(): raise Exception("Storage type 'file' does not support any storage credentials") def _has_aws_storage_info(self): return self.role_arn or self.external_id or self.user_arn or self.region def _has_azure_storage_info(self): return self.tenant_id or self.multi_tenant_app_name or self.consent_url def _has_gcs_storage_info(self): return self.service_account def _build_storage_config_info(self): config = None if self.storage_type == StorageType.S3.value: config = AwsStorageConfigInfo( storage_type=self.storage_type.upper(), allowed_locations=self.allowed_locations, role_arn=self.role_arn, external_id=self.external_id, user_arn=self.user_arn, region=self.region ) elif self.storage_type == StorageType.AZURE.value: config = AzureStorageConfigInfo( storage_type=self.storage_type.upper(), allowed_locations=self.allowed_locations, tenant_id=self.tenant_id, multi_tenant_app_name=self.multi_tenant_app_name, consent_url=self.consent_url, ) elif self.storage_type == StorageType.GCS.value: config = GcpStorageConfigInfo( storage_type=self.storage_type.upper(), allowed_locations=self.allowed_locations, gcs_service_account=self.service_account ) elif self.storage_type == StorageType.FILE.value: config = StorageConfigInfo( storage_type=self.storage_type.upper(), allowed_locations=self.allowed_locations ) return config def execute(self, api: PolarisDefaultApi) -> None: if self.catalogs_subcommand == Subcommands.CREATE: config = self._build_storage_config_info() if self.catalog_type == CatalogType.EXTERNAL.value: request = CreateCatalogRequest( catalog=ExternalCatalog( type=self.catalog_type.upper(), name=self.catalog_name, storage_config_info=config, remote_url=self.remote_url, properties=CatalogProperties( default_base_location=self.default_base_location, additional_properties=self.properties ) ) ) else: request = CreateCatalogRequest( catalog=PolarisCatalog( type=self.catalog_type.upper(), name=self.catalog_name, storage_config_info=config, properties=CatalogProperties( default_base_location=self.default_base_location, additional_properties=self.properties ) ) ) api.create_catalog(request) elif self.catalogs_subcommand == Subcommands.DELETE: api.delete_catalog(self.catalog_name) elif self.catalogs_subcommand == Subcommands.GET: print(api.get_catalog(self.catalog_name).to_json()) elif self.catalogs_subcommand == Subcommands.LIST: for catalog in api.list_catalogs().catalogs: print(catalog.to_json()) elif self.catalogs_subcommand == Subcommands.UPDATE: catalog = api.get_catalog(self.catalog_name) if self.default_base_location or self.set_properties or self.remove_properties: new_default_base_location = self.default_base_location or catalog.properties.default_base_location new_additional_properties = catalog.properties.additional_properties or {} # Add or update all entries specified in set_properties if self.set_properties: new_additional_properties = {**new_additional_properties, **self.set_properties} # Remove all keys specified in remove_properties if self.remove_properties: for to_remove in self.remove_properties: new_additional_properties.pop(to_remove, None) catalog.properties = CatalogProperties( default_base_location=new_default_base_location, additional_properties=new_additional_properties ) if (self._has_aws_storage_info() or self._has_azure_storage_info() or self._has_gcs_storage_info() or self.allowed_locations): # We must first reconstitute local storage-config related settings from the existing # catalog to properly construct the complete updated storage-config updated_storage_info = catalog.storage_config_info # In order to apply mutations client-side, we can't just use the base # _build_storage_config_info helper; instead, each allowed updatable field defined # in option_tree.py should be applied individually against the existing # storage_config_info here. if self.allowed_locations: updated_storage_info.allowed_locations.extend(self.allowed_locations) if self.region: # Note: We have to lowercase the returned value because the server enum # is uppercase but we defined the StorageType enums as lowercase. storage_type = updated_storage_info.storage_type if storage_type.lower() != StorageType.S3.value: raise Exception( f'--region requires S3 storage_type, got: {storage_type}') updated_storage_info.region = self.region request = UpdateCatalogRequest( current_entity_version=catalog.entity_version, properties=catalog.properties.to_dict(), storage_config_info=updated_storage_info ) else: request = UpdateCatalogRequest( current_entity_version=catalog.entity_version, properties=catalog.properties.to_dict() ) api.update_catalog(self.catalog_name, request) else: raise Exception(f'{self.catalogs_subcommand} is not supported in the CLI')