pyiceberg/catalog/__init__.py (500 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations import importlib import logging import re import uuid from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum from typing import ( TYPE_CHECKING, Callable, Dict, List, Optional, Set, Tuple, Type, Union, cast, ) from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError, NotInstalledError, TableAlreadyExistsError, ) from pyiceberg.io import FileIO, load_file_io from pyiceberg.manifest import ManifestFile from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import ( DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, CommitTableResponse, CreateTableTransaction, StagedTable, Table, TableProperties, ) from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( TableRequirement, TableUpdate, update_table_metadata, ) from pyiceberg.typedef import ( EMPTY_DICT, Identifier, Properties, RecursiveDict, ) from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.properties import property_as_bool if TYPE_CHECKING: import pyarrow as pa logger = logging.getLogger(__name__) _ENV_CONFIG = Config() TOKEN = "token" TYPE = "type" PY_CATALOG_IMPL = "py-catalog-impl" ICEBERG = "iceberg" TABLE_TYPE = "table_type" WAREHOUSE_LOCATION = "warehouse" METADATA_LOCATION = "metadata_location" PREVIOUS_METADATA_LOCATION = "previous_metadata_location" MANIFEST = "manifest" MANIFEST_LIST = "manifest list" PREVIOUS_METADATA = "previous metadata" METADATA = "metadata" URI = "uri" LOCATION = "location" EXTERNAL_TABLE = "EXTERNAL_TABLE" BOTOCORE_SESSION = "botocore_session" TABLE_METADATA_FILE_NAME_REGEX = re.compile( r""" (\d+) # version number - # separator ([\w-]{36}) # UUID (36 characters, including hyphens) (?:\.\w+)? # optional codec name \.metadata\.json # file extension """, re.X, ) class CatalogType(Enum): REST = "rest" HIVE = "hive" GLUE = "glue" DYNAMODB = "dynamodb" SQL = "sql" IN_MEMORY = "in-memory" def load_rest(name: str, conf: Properties) -> Catalog: from pyiceberg.catalog.rest import RestCatalog return RestCatalog(name, **conf) def load_hive(name: str, conf: Properties) -> Catalog: try: from pyiceberg.catalog.hive import HiveCatalog return HiveCatalog(name, **conf) except ImportError as exc: raise NotInstalledError("Apache Hive support not installed: pip install 'pyiceberg[hive]'") from exc def load_glue(name: str, conf: Properties) -> Catalog: try: from pyiceberg.catalog.glue import GlueCatalog return GlueCatalog(name, **conf) except ImportError as exc: raise NotInstalledError("AWS glue support not installed: pip install 'pyiceberg[glue]'") from exc def load_dynamodb(name: str, conf: Properties) -> Catalog: try: from pyiceberg.catalog.dynamodb import DynamoDbCatalog return DynamoDbCatalog(name, **conf) except ImportError as exc: raise NotInstalledError("AWS DynamoDB support not installed: pip install 'pyiceberg[dynamodb]'") from exc def load_sql(name: str, conf: Properties) -> Catalog: try: from pyiceberg.catalog.sql import SqlCatalog return SqlCatalog(name, **conf) except ImportError as exc: raise NotInstalledError( "SQLAlchemy support not installed: pip install 'pyiceberg[sql-postgres]' or pip install 'pyiceberg[sql-sqlite]'" ) from exc def load_in_memory(name: str, conf: Properties) -> Catalog: try: from pyiceberg.catalog.memory import InMemoryCatalog return InMemoryCatalog(name, **conf) except ImportError as exc: raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = { CatalogType.REST: load_rest, CatalogType.HIVE: load_hive, CatalogType.GLUE: load_glue, CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, CatalogType.IN_MEMORY: load_in_memory, } def infer_catalog_type(name: str, catalog_properties: RecursiveDict) -> Optional[CatalogType]: """Try to infer the type based on the dict. Args: name: Name of the catalog. catalog_properties: Catalog properties. Returns: The inferred type based on the provided properties. Raises: ValueError: Raises a ValueError in case properties are missing, or the wrong type. """ if uri := catalog_properties.get("uri"): if isinstance(uri, str): if uri.startswith("http"): return CatalogType.REST elif uri.startswith("thrift"): return CatalogType.HIVE elif uri.startswith(("sqlite", "postgresql")): return CatalogType.SQL else: raise ValueError(f"Could not infer the catalog type from the uri: {uri}") else: raise ValueError(f"Expects the URI to be a string, got: {type(uri)}") raise ValueError( f"URI missing, please provide using --uri, the config or environment variable PYICEBERG_CATALOG__{name.upper()}__URI" ) def load_catalog(name: Optional[str] = None, **properties: Optional[str]) -> Catalog: """Load the catalog based on the properties. Will look up the properties from the config, based on the name. Args: name: The name of the catalog. properties: The properties that are used next to the configuration. Returns: An initialized Catalog. Raises: ValueError: Raises a ValueError in case properties are missing or malformed, or if it could not determine the catalog based on the properties. """ if name is None: name = _ENV_CONFIG.get_default_catalog_name() env = _ENV_CONFIG.get_catalog_config(name) conf: RecursiveDict = merge_config(env or {}, cast(RecursiveDict, properties)) catalog_type: Optional[CatalogType] provided_catalog_type = conf.get(TYPE) if catalog_impl := properties.get(PY_CATALOG_IMPL): if provided_catalog_type: raise ValueError( "Must not set both catalog type and py-catalog-impl configurations, " f"but found type {provided_catalog_type} and py-catalog-impl {catalog_impl}" ) if catalog := _import_catalog(name, catalog_impl, properties): logger.info("Loaded Catalog: %s", catalog_impl) return catalog else: raise ValueError(f"Could not initialize Catalog: {catalog_impl}") catalog_type = None if provided_catalog_type and isinstance(provided_catalog_type, str): catalog_type = CatalogType(provided_catalog_type.lower()) elif not provided_catalog_type: catalog_type = infer_catalog_type(name, conf) if catalog_type: return AVAILABLE_CATALOGS[catalog_type](name, cast(Dict[str, str], conf)) raise ValueError(f"Could not initialize catalog with the following properties: {properties}") def delete_files(io: FileIO, files_to_delete: Set[str], file_type: str) -> None: """Delete files. Log warnings if failing to delete any file. Args: io: The FileIO used to delete the object. files_to_delete: A set of file paths to be deleted. file_type: The type of the file. """ for file in files_to_delete: try: io.delete(file) except OSError as exc: logger.warning(msg=f"Failed to delete {file_type} file {file}", exc_info=exc) def delete_data_files(io: FileIO, manifests_to_delete: List[ManifestFile]) -> None: """Delete data files linked to given manifests. Log warnings if failing to delete any file. Args: io: The FileIO used to delete the object. manifests_to_delete: A list of manifest contains paths of data files to be deleted. """ deleted_files: dict[str, bool] = {} for manifest_file in manifests_to_delete: for entry in manifest_file.fetch_manifest_entry(io, discard_deleted=False): path = entry.data_file.file_path if not deleted_files.get(path, False): try: io.delete(path) except OSError as exc: logger.warning(msg=f"Failed to delete data file {path}", exc_info=exc) deleted_files[path] = True def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Optional[Catalog]: try: path_parts = catalog_impl.split(".") if len(path_parts) < 2: raise ValueError(f"py-catalog-impl should be full path (module.CustomCatalog), got: {catalog_impl}") module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] module = importlib.import_module(module_name) class_ = getattr(module, class_name) return class_(name, **properties) except ModuleNotFoundError as exc: logger.warning(f"Could not initialize Catalog: {catalog_impl}", exc_info=exc) return None @dataclass class PropertiesUpdateSummary: removed: List[str] updated: List[str] missing: List[str] class Catalog(ABC): """Base Catalog for table operations like - create, drop, load, list and others. The catalog table APIs accept a table identifier, which is fully classified table name. The identifier can be a string or tuple of strings. If the identifier is a string, it is split into a tuple on '.'. If it is a tuple, it is used as-is. The catalog namespace APIs follow a similar convention wherein they also accept a namespace identifier that can be a string or tuple of strings. Attributes: name (str): Name of the catalog. properties (Properties): Catalog properties. """ name: str properties: Properties def __init__(self, name: str, **properties: str): self.name = name self.properties = properties @abstractmethod def create_table( self, identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, ) -> Table: """Create a table. Args: identifier (str | Identifier): Table identifier. schema (Schema): Table's schema. location (str | None): Location for the table. Optional Argument. partition_spec (PartitionSpec): PartitionSpec for the table. sort_order (SortOrder): SortOrder for the table. properties (Properties): Table properties that can be a string based dictionary. Returns: Table: the created table instance. Raises: TableAlreadyExistsError: If a table with the name already exists. """ @abstractmethod def create_table_transaction( self, identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, ) -> CreateTableTransaction: """Create a CreateTableTransaction. Args: identifier (str | Identifier): Table identifier. schema (Schema): Table's schema. location (str | None): Location for the table. Optional Argument. partition_spec (PartitionSpec): PartitionSpec for the table. sort_order (SortOrder): SortOrder for the table. properties (Properties): Table properties that can be a string based dictionary. Returns: CreateTableTransaction: createTableTransaction instance. """ def create_table_if_not_exists( self, identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, ) -> Table: """Create a table if it does not exist. Args: identifier (str | Identifier): Table identifier. schema (Schema): Table's schema. location (str | None): Location for the table. Optional Argument. partition_spec (PartitionSpec): PartitionSpec for the table. sort_order (SortOrder): SortOrder for the table. properties (Properties): Table properties that can be a string based dictionary. Returns: Table: the created table instance if the table does not exist, else the existing table instance. """ try: return self.create_table(identifier, schema, location, partition_spec, sort_order, properties) except TableAlreadyExistsError: return self.load_table(identifier) @abstractmethod def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. You can also use this method to check for table existence using 'try catalog.table() except NoSuchTableError'. Note: This method doesn't scan data stored in the table. Args: identifier (str | Identifier): Table identifier. Returns: Table: the table instance with its metadata. Raises: NoSuchTableError: If a table with the name does not exist. """ @abstractmethod def table_exists(self, identifier: Union[str, Identifier]) -> bool: """Check if a table exists. Args: identifier (str | Identifier): Table identifier. Returns: bool: True if the table exists, False otherwise. """ @abstractmethod def view_exists(self, identifier: Union[str, Identifier]) -> bool: """Check if a view exists. Args: identifier (str | Identifier): View identifier. Returns: bool: True if the view exists, False otherwise. """ @abstractmethod def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. Args: identifier Union[str, Identifier]: Table identifier for the table metadata_location str: The location to the metadata Returns: Table: The newly registered table Raises: TableAlreadyExistsError: If the table already exists """ @abstractmethod def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. Args: identifier (str | Identifier): Table identifier. Raises: NoSuchTableError: If a table with the name does not exist. """ @abstractmethod def purge_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table and purge all data and metadata files. Note: This method only logs warning rather than raise exception when encountering file deletion failure. Args: identifier (str | Identifier): Table identifier. Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ @abstractmethod def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. Args: from_identifier (str | Identifier): Existing table identifier. to_identifier (str | Identifier): New table identifier. Returns: Table: the updated table instance with its metadata. Raises: NoSuchTableError: If a table with the name does not exist. """ @abstractmethod def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: """Commit updates to a table. Args: table (Table): The table to be updated. requirements: (Tuple[TableRequirement, ...]): Table requirements. updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. Raises: NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. CommitStateUnknownException: Failed due to an internal exception on the side of the catalog. """ @abstractmethod def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier. properties (Properties): A string dictionary of properties for the given namespace. Raises: NamespaceAlreadyExistsError: If a namespace with the given name already exists. """ def create_namespace_if_not_exists(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace if it does not exist. Args: namespace (str | Identifier): Namespace identifier. properties (Properties): A string dictionary of properties for the given namespace. """ try: self.create_namespace(namespace, properties) except NamespaceAlreadyExistsError: pass @abstractmethod def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """Drop a namespace. Args: namespace (str | Identifier): Namespace identifier. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. NamespaceNotEmptyError: If the namespace is not empty. """ @abstractmethod def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: List[Identifier]: list of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: List[Identifier]: a List of namespace identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List views under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: List[Identifier]: list of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. Args: namespace (str | Identifier): Namespace identifier. Returns: Properties: Properties for the given namespace. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: """Remove provided property keys and updates properties for a namespace. Args: namespace (str | Identifier): Namespace identifier. removals (Set[str]): Set of property keys that need to be removed. Optional Argument. updates (Properties): Properties to be updated for the given namespace. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. ValueError: If removals and updates have overlapping keys. """ @abstractmethod def drop_view(self, identifier: Union[str, Identifier]) -> None: """Drop a view. Args: identifier (str | Identifier): View identifier. Raises: NoSuchViewError: If a view with the given name does not exist. """ @staticmethod def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier: """Parse an identifier to a tuple. If the identifier is a string, it is split into a tuple on '.'. If it is a tuple, it is used as-is. Args: identifier (str | Identifier): an identifier, either a string or tuple of strings. Returns: Identifier: a tuple of strings. """ return identifier if isinstance(identifier, tuple) else tuple(str.split(identifier, ".")) @staticmethod def table_name_from(identifier: Union[str, Identifier]) -> str: """Extract table name from a table identifier. Args: identifier (str | Identifier: a table identifier. Returns: str: Table name. """ return Catalog.identifier_to_tuple(identifier)[-1] @staticmethod def namespace_from(identifier: Union[str, Identifier]) -> Identifier: """Extract table namespace from a table identifier. Args: identifier (Union[str, Identifier]): a table identifier. Returns: Identifier: Namespace identifier. """ return Catalog.identifier_to_tuple(identifier)[:-1] @staticmethod def namespace_to_string( identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError ) -> str: """Transform a namespace identifier into a string. Args: identifier (Union[str, Identifier]): a namespace identifier. err (Union[Type[ValueError], Type[NoSuchNamespaceError]]): the error type to raise when identifier is empty. Returns: Identifier: Namespace identifier. """ tuple_identifier = Catalog.identifier_to_tuple(identifier) if len(tuple_identifier) < 1: raise err("Empty namespace identifier") # Check if any segment of the tuple is an empty string if any(segment.strip() == "" for segment in tuple_identifier): raise err("Namespace identifier contains an empty segment or a segment with only whitespace") return ".".join(segment.strip() for segment in tuple_identifier) @staticmethod def identifier_to_database( identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError ) -> str: tuple_identifier = Catalog.identifier_to_tuple(identifier) if len(tuple_identifier) != 1: raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}") return tuple_identifier[0] @staticmethod def identifier_to_database_and_table( identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchTableError], Type[NoSuchNamespaceError]] = ValueError, ) -> Tuple[str, str]: tuple_identifier = Catalog.identifier_to_tuple(identifier) if len(tuple_identifier) != 2: raise err(f"Invalid path, hierarchical namespaces are not supported: {identifier}") return tuple_identifier[0], tuple_identifier[1] def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO: return load_file_io({**self.properties, **properties}, location) @staticmethod def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: if isinstance(schema, Schema): return schema try: import pyarrow as pa from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False if isinstance(schema, pa.Schema): schema: Schema = visit_pyarrow( # type: ignore schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) ) return schema except ModuleNotFoundError: pass raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema") @staticmethod def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None: """Delete oldest metadata if config is set to true.""" delete_after_commit: bool = property_as_bool( metadata.properties, TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT, ) if delete_after_commit: removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log} current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log} removed_previous_metadata_files.difference_update(current_metadata_files) delete_files(io, removed_previous_metadata_files, METADATA) def __repr__(self) -> str: """Return the string representation of the Catalog class.""" return f"{self.name} ({self.__class__})" class MetastoreCatalog(Catalog, ABC): def __init__(self, name: str, **properties: str): super().__init__(name, **properties) def create_table_transaction( self, identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, ) -> CreateTableTransaction: return CreateTableTransaction( self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties) ) def table_exists(self, identifier: Union[str, Identifier]) -> bool: try: self.load_table(identifier) return True except NoSuchTableError: return False def purge_table(self, identifier: Union[str, Identifier]) -> None: table = self.load_table(identifier) self.drop_table(identifier) io = load_file_io(self.properties, table.metadata_location) metadata = table.metadata manifest_lists_to_delete = set() manifests_to_delete: List[ManifestFile] = [] for snapshot in metadata.snapshots: manifests_to_delete += snapshot.manifests(io) manifest_lists_to_delete.add(snapshot.manifest_list) manifest_paths_to_delete = {manifest.manifest_path for manifest in manifests_to_delete} prev_metadata_files = {log.metadata_file for log in metadata.metadata_log} delete_data_files(io, manifests_to_delete) delete_files(io, manifest_paths_to_delete, MANIFEST) delete_files(io, manifest_lists_to_delete, MANIFEST_LIST) delete_files(io, prev_metadata_files, PREVIOUS_METADATA) delete_files(io, {table.metadata_location}, METADATA) def _create_staged_table( self, identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, ) -> StagedTable: """Create a table and return the table instance without committing the changes. Args: identifier (str | Identifier): Table identifier. schema (Schema): Table's schema. location (str | None): Location for the table. Optional Argument. partition_spec (PartitionSpec): PartitionSpec for the table. sort_order (SortOrder): SortOrder for the table. properties (Properties): Table properties that can be a string based dictionary. Returns: StagedTable: the created staged table instance. """ schema: Schema = self._convert_schema_if_needed(schema) # type: ignore database_name, table_name = self.identifier_to_database_and_table(identifier) location = self._resolve_table_location(location, database_name, table_name) provider = load_location_provider(location, properties) metadata_location = provider.new_table_metadata_file_location() metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) io = self._load_file_io(properties=properties, location=metadata_location) return StagedTable( identifier=(database_name, table_name), metadata=metadata, metadata_location=metadata_location, io=io, catalog=self, ) def _update_and_stage_table( self, current_table: Optional[Table], table_identifier: Identifier, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...], ) -> StagedTable: for requirement in requirements: requirement.validate(current_table.metadata if current_table else None) updated_metadata = update_table_metadata( base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), updates=updates, enforce_validation=current_table is None, metadata_location=current_table.metadata_location if current_table else None, ) new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 provider = load_location_provider(updated_metadata.location, updated_metadata.properties) new_metadata_location = provider.new_table_metadata_file_location(new_metadata_version) return StagedTable( identifier=table_identifier, metadata=updated_metadata, metadata_location=new_metadata_location, io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), catalog=self, ) def _get_updated_props_and_update_summary( self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties ) -> Tuple[PropertiesUpdateSummary, Properties]: self._check_for_overlap(updates=updates, removals=removals) updated_properties = dict(current_properties) removed: Set[str] = set() updated: Set[str] = set() if removals: for key in removals: if key in updated_properties: updated_properties.pop(key) removed.add(key) if updates: for key, value in updates.items(): updated_properties[key] = value updated.add(key) expected_to_change = (removals or set()).difference(removed) properties_update_summary = PropertiesUpdateSummary( removed=list(removed or []), updated=list(updated or []), missing=list(expected_to_change) ) return properties_update_summary, updated_properties def _resolve_table_location(self, location: Optional[str], database_name: str, table_name: str) -> str: if not location: return self._get_default_warehouse_location(database_name, table_name) return location.rstrip("/") def _get_default_warehouse_location(self, database_name: str, table_name: str) -> str: database_properties = self.load_namespace_properties(database_name) if database_location := database_properties.get(LOCATION): database_location = database_location.rstrip("/") return f"{database_location}/{table_name}" if warehouse_path := self.properties.get(WAREHOUSE_LOCATION): warehouse_path = warehouse_path.rstrip("/") return f"{warehouse_path}/{database_name}.db/{table_name}" raise ValueError("No default path is set, please specify a location when creating a table") @staticmethod def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None: ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) @staticmethod def _parse_metadata_version(metadata_location: str) -> int: """Parse the version from the metadata location. The version is the first part of the file name, before the first dash. For example, the version of the metadata file `s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json` is 1. If the path does not comply with the pattern, the version is defaulted to be -1, ensuring that the next metadata file is treated as having version 0. Args: metadata_location (str): The location of the metadata file. Returns: int: The version of the metadata file. -1 if the file name does not have valid version string """ file_name = metadata_location.split("/")[-1] if file_name_match := TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name): try: uuid.UUID(file_name_match.group(2)) except ValueError: return -1 return int(file_name_match.group(1)) else: return -1 @staticmethod def _check_for_overlap(removals: Optional[Set[str]], updates: Properties) -> None: if updates and removals: overlap = set(removals) & set(updates.keys()) if overlap: raise ValueError(f"Updates and deletes have an overlap: {overlap}") @staticmethod def _empty_table_metadata() -> TableMetadata: """Return an empty TableMetadata instance. It is used to build a TableMetadata from a sequence of initial TableUpdates. It is a V1 TableMetadata because there will be a UpgradeFormatVersionUpdate in initial changes to bump the metadata to the target version. Returns: TableMetadata: An empty TableMetadata instance. """ return TableMetadataV1.model_construct(last_column_id=-1, schema=Schema())