pyiceberg/catalog/glue.py (511 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from typing import ( TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union, cast, ) import boto3 from botocore.config import Config from mypy_boto3_glue.client import GlueClient from mypy_boto3_glue.type_defs import ( ColumnTypeDef, DatabaseInputTypeDef, DatabaseTypeDef, StorageDescriptorTypeDef, TableInputTypeDef, TableTypeDef, ) from pyiceberg.catalog import ( BOTOCORE_SESSION, EXTERNAL_TABLE, ICEBERG, LOCATION, METADATA_LOCATION, PREVIOUS_METADATA_LOCATION, TABLE_TYPE, MetastoreCatalog, PropertiesUpdateSummary, ) from pyiceberg.exceptions import ( CommitFailedException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchIcebergTableError, NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError, TableAlreadyExistsError, ) from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile from pyiceberg.table import ( CommitTableResponse, Table, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( TableRequirement, TableUpdate, ) from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties from pyiceberg.types import ( BinaryType, BooleanType, DateType, DecimalType, DoubleType, FixedType, FloatType, IntegerType, ListType, LongType, MapType, NestedField, PrimitiveType, StringType, StructType, TimestampType, TimestamptzType, TimeType, UUIDType, ) from pyiceberg.utils.properties import get_first_property_value, property_as_bool if TYPE_CHECKING: import pyarrow as pa # There is a unique Glue metastore in each AWS account and each AWS region. By default, GlueCatalog chooses the Glue # metastore to use based on the user's default AWS client credential and region setup. You can specify the Glue catalog # ID through glue.id catalog property to point to a Glue catalog in a different AWS account. The Glue catalog ID is your # numeric AWS account ID. GLUE_ID = "glue.id" # If Glue should skip archiving an old table version when creating a new version in a commit. By # default, Glue archives all old table versions after an UpdateTable call, but Glue has a default # max number of archived table versions (can be increased). So for streaming use case with lots # of commits, it is recommended to set this value to true. GLUE_SKIP_ARCHIVE = "glue.skip-archive" GLUE_SKIP_ARCHIVE_DEFAULT = True # Configure an alternative endpoint of the Glue service for GlueCatalog to access. # This could be used to use GlueCatalog with any glue-compatible metastore service that has a different endpoint GLUE_CATALOG_ENDPOINT = "glue.endpoint" ICEBERG_FIELD_ID = "iceberg.field.id" ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional" ICEBERG_FIELD_CURRENT = "iceberg.field.current" GLUE_PROFILE_NAME = "glue.profile-name" GLUE_REGION = "glue.region" GLUE_ACCESS_KEY_ID = "glue.access-key-id" GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key" GLUE_SESSION_TOKEN = "glue.session-token" GLUE_MAX_RETRIES = "glue.max-retries" GLUE_RETRY_MODE = "glue.retry-mode" MAX_RETRIES = 10 STANDARD_RETRY_MODE = "standard" ADAPTIVE_RETRY_MODE = "adaptive" LEGACY_RETRY_MODE = "legacy" EXISTING_RETRY_MODES = [STANDARD_RETRY_MODE, ADAPTIVE_RETRY_MODE, LEGACY_RETRY_MODE] def _construct_parameters( metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None ) -> Properties: new_parameters = glue_table.get("Parameters", {}) if glue_table else {} new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}) if prev_metadata_location: new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location return new_parameters GLUE_PRIMITIVE_TYPES = { BooleanType: "boolean", IntegerType: "int", LongType: "bigint", FloatType: "float", DoubleType: "double", DateType: "date", TimeType: "string", StringType: "string", UUIDType: "string", TimestampType: "timestamp", TimestamptzType: "timestamp", FixedType: "binary", BinaryType: "binary", } class _IcebergSchemaToGlueType(SchemaVisitor[str]): def schema(self, schema: Schema, struct_result: str) -> str: return struct_result def struct(self, struct: StructType, field_results: List[str]) -> str: return f"struct<{','.join(field_results)}>" def field(self, field: NestedField, field_result: str) -> str: return f"{field.name}:{field_result}" def list(self, list_type: ListType, element_result: str) -> str: return f"array<{element_result}>" def map(self, map_type: MapType, key_result: str, value_result: str) -> str: return f"map<{key_result},{value_result}>" def primitive(self, primitive: PrimitiveType) -> str: if isinstance(primitive, DecimalType): return f"decimal({primitive.precision},{primitive.scale})" if (primitive_type := type(primitive)) not in GLUE_PRIMITIVE_TYPES: return str(primitive) return GLUE_PRIMITIVE_TYPES[primitive_type] def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]: results: Dict[str, ColumnTypeDef] = {} def _append_to_results(field: NestedField, is_current: bool) -> None: if field.name in results: return results[field.name] = cast( ColumnTypeDef, { "Name": field.name, "Type": visit(field.field_type, _IcebergSchemaToGlueType()), "Parameters": { ICEBERG_FIELD_ID: str(field.field_id), ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(), ICEBERG_FIELD_CURRENT: str(is_current).lower(), }, }, ) if field.doc: results[field.name]["Comment"] = field.doc if current_schema := metadata.schema_by_id(metadata.current_schema_id): for field in current_schema.columns: _append_to_results(field, True) for schema in metadata.schemas: if schema.schema_id == metadata.current_schema_id: continue for field in schema.columns: _append_to_results(field, False) return list(results.values()) def _construct_table_input( table_name: str, metadata_location: str, properties: Properties, metadata: TableMetadata, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None, ) -> TableInputTypeDef: table_input: TableInputTypeDef = { "Name": table_name, "TableType": EXTERNAL_TABLE, "Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location), "StorageDescriptor": { "Columns": _to_columns(metadata), "Location": metadata.location, }, } if "Description" in properties: table_input["Description"] = properties["Description"] return table_input def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef) -> TableInputTypeDef: rename_table_input: TableInputTypeDef = {"Name": to_table_name} # use the same Glue info to create the new table, pointing to the old metadata assert glue_table["TableType"] rename_table_input["TableType"] = glue_table["TableType"] if "Owner" in glue_table: rename_table_input["Owner"] = glue_table["Owner"] if "Parameters" in glue_table: rename_table_input["Parameters"] = glue_table["Parameters"] if "StorageDescriptor" in glue_table: # It turns out the output of StorageDescriptor is not the same as the input type # because the Column can have a different type, but for now it seems to work, so # silence the type error. rename_table_input["StorageDescriptor"] = cast(StorageDescriptorTypeDef, glue_table["StorageDescriptor"]) if "Description" in glue_table: rename_table_input["Description"] = glue_table["Description"] return rename_table_input def _construct_database_input(database_name: str, properties: Properties) -> DatabaseInputTypeDef: database_input: DatabaseInputTypeDef = {"Name": database_name} parameters = {} for k, v in properties.items(): if k == "Description": database_input["Description"] = v elif k == LOCATION: database_input["LocationUri"] = v else: parameters[k] = v database_input["Parameters"] = parameters return database_input def _register_glue_catalog_id_with_glue_client(glue: GlueClient, glue_catalog_id: str) -> None: """ Register the Glue Catalog ID (AWS Account ID) as a parameter on all Glue client methods. It's more ergonomic to do this than to pass the CatalogId as a parameter to every client call since it's an optional parameter and boto3 does not support 'None' values for missing parameters. """ event_system = glue.meta.events def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None: if "CatalogId" not in params: params["CatalogId"] = glue_catalog_id event_system.register("provide-client-params.glue", add_glue_catalog_id) class GlueCatalog(MetastoreCatalog): glue: GlueClient def __init__(self, name: str, client: Optional[GlueClient] = None, **properties: Any): """Glue Catalog. You either need to provide a boto3 glue client, or one will be constructed from the properties. Args: name: Name to identify the catalog. client: An optional boto3 glue client. properties: Properties for glue client construction and configuration. """ super().__init__(name, **properties) if client: self.glue = client else: retry_mode_prop_value = get_first_property_value(properties, GLUE_RETRY_MODE) session = boto3.Session( profile_name=properties.get(GLUE_PROFILE_NAME), region_name=get_first_property_value(properties, GLUE_REGION, AWS_REGION), botocore_session=properties.get(BOTOCORE_SESSION), aws_access_key_id=get_first_property_value(properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), aws_secret_access_key=get_first_property_value(properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), aws_session_token=get_first_property_value(properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN), ) self.glue: GlueClient = session.client( "glue", endpoint_url=properties.get(GLUE_CATALOG_ENDPOINT), config=Config( retries={ "max_attempts": properties.get(GLUE_MAX_RETRIES, MAX_RETRIES), "mode": retry_mode_prop_value if retry_mode_prop_value in EXISTING_RETRY_MODES else STANDARD_RETRY_MODE, } ), ) if glue_catalog_id := properties.get(GLUE_ID): _register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id) def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table: properties: Properties = glue_table["Parameters"] assert glue_table["DatabaseName"] assert glue_table["Parameters"] database_name = glue_table["DatabaseName"] table_name = glue_table["Name"] if TABLE_TYPE not in properties: raise NoSuchPropertyException( f"Property {TABLE_TYPE} missing, could not determine type: {database_name}.{table_name}" ) glue_table_type = properties[TABLE_TYPE] if glue_table_type.lower() != ICEBERG: raise NoSuchIcebergTableError( f"Property table_type is {glue_table_type}, expected {ICEBERG}: {database_name}.{table_name}" ) if METADATA_LOCATION not in properties: raise NoSuchPropertyException( f"Table property {METADATA_LOCATION} is missing, cannot find metadata for: {database_name}.{table_name}" ) metadata_location = properties[METADATA_LOCATION] io = self._load_file_io(location=metadata_location) file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) return Table( identifier=(database_name, table_name), metadata=metadata, metadata_location=metadata_location, io=self._load_file_io(metadata.properties, metadata_location), catalog=self, ) def _create_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef) -> None: try: self.glue.create_table(DatabaseName=database_name, TableInput=table_input) except self.glue.exceptions.AlreadyExistsException as e: raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None: try: self.glue.update_table( DatabaseName=database_name, TableInput=table_input, SkipArchive=property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), VersionId=version_id, ) except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name} (Glue table version {version_id})") from e except self.glue.exceptions.ConcurrentModificationException as e: raise CommitFailedException( f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}" ) from e def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef: try: load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name) return load_table_response["Table"] except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e def create_table( self, identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, ) -> Table: """ Create an Iceberg table. Args: identifier: Table identifier. schema: Table's schema. location: Location for the table. Optional Argument. partition_spec: PartitionSpec for the table. sort_order: SortOrder for the table. properties: Table properties that can be a string based dictionary. Returns: Table: the created table instance. Raises: AlreadyExistsError: If a table with the name already exists. ValueError: If the identifier is invalid, or no path is given to store metadata. """ staged_table = self._create_staged_table( identifier=identifier, schema=schema, location=location, partition_spec=partition_spec, sort_order=sort_order, properties=properties, ) database_name, table_name = self.identifier_to_database_and_table(identifier) self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) return self.load_table(identifier=identifier) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. Args: identifier Union[str, Identifier]: Table identifier for the table metadata_location str: The location to the metadata Returns: Table: The newly registered table Raises: TableAlreadyExistsError: If the table already exists """ database_name, table_name = self.identifier_to_database_and_table(identifier) properties = EMPTY_DICT io = self._load_file_io(location=metadata_location) file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) table_input = _construct_table_input(table_name, metadata_location, properties, metadata) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) return self.load_table(identifier=identifier) def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: """Commit updates to a table. Args: table (Table): The table to be updated. requirements: (Tuple[TableRequirement, ...]): Table requirements. updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. Raises: NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ table_identifier = table.name() database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) current_glue_table: Optional[TableTypeDef] glue_table_version_id: Optional[str] current_table: Optional[Table] try: current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) glue_table_version_id = current_glue_table.get("VersionId") current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) except NoSuchTableError: current_glue_table = None glue_table_version_id = None current_table = None updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) self._write_metadata( metadata=updated_staged_table.metadata, io=updated_staged_table.io, metadata_path=updated_staged_table.metadata_location, ) if current_table: # table exists, update the table if not glue_table_version_id: raise CommitFailedException( f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" ) # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking update_table_input = _construct_table_input( table_name=table_name, metadata_location=updated_staged_table.metadata_location, properties=updated_staged_table.properties, metadata=updated_staged_table.metadata, glue_table=current_glue_table, prev_metadata_location=current_table.metadata_location, ) self._update_glue_table( database_name=database_name, table_name=table_name, table_input=update_table_input, version_id=glue_table_version_id, ) else: # table does not exist, create the table create_table_input = _construct_table_input( table_name=table_name, metadata_location=updated_staged_table.metadata_location, properties=updated_staged_table.properties, metadata=updated_staged_table.metadata, ) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input) return CommitTableResponse( metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location ) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'. Note: This method doesn't scan data stored in the table. Args: identifier: Table identifier. Returns: Table: the table instance with its metadata. Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name)) def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. Args: identifier: Table identifier. Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) try: self.glue.delete_table(DatabaseName=database_name, Name=table_name) except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. This method can only rename Iceberg tables in AWS Glue. Args: from_identifier: Existing table identifier. to_identifier: New table identifier. Returns: Table: the updated table instance with its metadata. Raises: ValueError: When from table identifier is invalid. NoSuchTableError: When a table with the name does not exist. NoSuchIcebergTableError: When from table is not a valid iceberg table. NoSuchPropertyException: When from table miss some required properties. NoSuchNamespaceError: When the destination namespace doesn't exist. """ from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError) to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) try: get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name) except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchTableError(f"Table does not exist: {from_database_name}.{from_table_name}") from e glue_table = get_table_response["Table"] try: # verify that from_identifier is a valid iceberg table self._convert_glue_to_iceberg(glue_table=glue_table) except NoSuchPropertyException as e: raise NoSuchPropertyException( f"Failed to rename table {from_database_name}.{from_table_name} since it is missing required properties" ) from e except NoSuchIcebergTableError as e: raise NoSuchIcebergTableError( f"Failed to rename table {from_database_name}.{from_table_name} since it is not a valid iceberg table" ) from e rename_table_input = _construct_rename_table_input(to_table_name=to_table_name, glue_table=glue_table) self._create_glue_table(database_name=to_database_name, table_name=to_table_name, table_input=rename_table_input) try: self.drop_table(from_identifier) except Exception as e: log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. " try: self.drop_table(to_identifier) log_message += f"Rolled back table creation for {to_database_name}.{to_table_name}." except NoSuchTableError: log_message += ( f"Failed to roll back table creation for {to_database_name}.{to_table_name}. Please clean up manually" ) raise ValueError(log_message) from e return self.load_table(to_identifier) def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. Args: namespace: Namespace identifier. properties: A string dictionary of properties for the given namespace. Raises: ValueError: If the identifier is invalid. AlreadyExistsError: If a namespace with the given name already exists. """ database_name = self.identifier_to_database(namespace) try: self.glue.create_database(DatabaseInput=_construct_database_input(database_name, properties)) except self.glue.exceptions.AlreadyExistsException as e: raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """Drop a namespace. A Glue namespace can only be dropped if it is empty. Args: namespace: Namespace identifier. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. NamespaceNotEmptyError: If the namespace is not empty. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) try: table_list = self.list_tables(namespace=database_name) except NoSuchNamespaceError as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e if len(table_list) > 0: raise NamespaceNotEmptyError(f"Database {database_name} is not empty") self.glue.delete_database(Name=database_name) def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List Iceberg tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: List[Identifier]: list of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) table_list: List[TableTypeDef] = [] next_token: Optional[str] = None try: while True: table_list_response = ( self.glue.get_tables(DatabaseName=database_name) if not next_token else self.glue.get_tables(DatabaseName=database_name, NextToken=next_token) ) table_list.extend(table_list_response["TableList"]) next_token = table_list_response.get("NextToken") if not next_token: break except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)] def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Returns: List[Identifier]: a List of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: return [] database_list: List[DatabaseTypeDef] = [] next_token: Optional[str] = None while True: databases_response = self.glue.get_databases() if not next_token else self.glue.get_databases(NextToken=next_token) database_list.extend(databases_response["DatabaseList"]) next_token = databases_response.get("NextToken") if not next_token: break return [self.identifier_to_tuple(database["Name"]) for database in database_list] def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. Args: namespace: Namespace identifier. Returns: Properties: Properties for the given namespace. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) try: database_response = self.glue.get_database(Name=database_name) except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e except self.glue.exceptions.InvalidInputException as e: raise NoSuchNamespaceError(f"Invalid input for namespace {database_name}") from e database = database_response["Database"] properties = dict(database.get("Parameters", {})) if "LocationUri" in database: properties["location"] = database["LocationUri"] if "Description" in database: properties["Description"] = database["Description"] return properties def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: """Remove provided property keys and updates properties for a namespace. Args: namespace: Namespace identifier. removals: Set of property keys that need to be removed. Optional Argument. updates: Properties to be updated for the given namespace. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid. ValueError: If removals and updates have overlapping keys. """ current_properties = self.load_namespace_properties(namespace=namespace) properties_update_summary, updated_properties = self._get_updated_props_and_update_summary( current_properties=current_properties, removals=removals, updates=updates ) database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) self.glue.update_database(Name=database_name, DatabaseInput=_construct_database_input(database_name, updated_properties)) return properties_update_summary def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError def view_exists(self, identifier: Union[str, Identifier]) -> bool: raise NotImplementedError @staticmethod def __is_iceberg_table(table: TableTypeDef) -> bool: return table.get("Parameters", {}).get(TABLE_TYPE, "").lower() == ICEBERG