pyiceberg/catalog/__init__.py (500 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import importlib
import logging
import re
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import (
TYPE_CHECKING,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
cast,
)
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NoSuchNamespaceError,
NoSuchTableError,
NotInstalledError,
TableAlreadyExistsError,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import ManifestFile
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
TableProperties,
)
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
TableRequirement,
TableUpdate,
update_table_metadata,
)
from pyiceberg.typedef import (
EMPTY_DICT,
Identifier,
Properties,
RecursiveDict,
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.properties import property_as_bool
if TYPE_CHECKING:
import pyarrow as pa
logger = logging.getLogger(__name__)
_ENV_CONFIG = Config()
TOKEN = "token"
TYPE = "type"
PY_CATALOG_IMPL = "py-catalog-impl"
ICEBERG = "iceberg"
TABLE_TYPE = "table_type"
WAREHOUSE_LOCATION = "warehouse"
METADATA_LOCATION = "metadata_location"
PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
MANIFEST = "manifest"
MANIFEST_LIST = "manifest list"
PREVIOUS_METADATA = "previous metadata"
METADATA = "metadata"
URI = "uri"
LOCATION = "location"
EXTERNAL_TABLE = "EXTERNAL_TABLE"
BOTOCORE_SESSION = "botocore_session"
TABLE_METADATA_FILE_NAME_REGEX = re.compile(
r"""
(\d+) # version number
- # separator
([\w-]{36}) # UUID (36 characters, including hyphens)
(?:\.\w+)? # optional codec name
\.metadata\.json # file extension
""",
re.X,
)
class CatalogType(Enum):
REST = "rest"
HIVE = "hive"
GLUE = "glue"
DYNAMODB = "dynamodb"
SQL = "sql"
IN_MEMORY = "in-memory"
def load_rest(name: str, conf: Properties) -> Catalog:
from pyiceberg.catalog.rest import RestCatalog
return RestCatalog(name, **conf)
def load_hive(name: str, conf: Properties) -> Catalog:
try:
from pyiceberg.catalog.hive import HiveCatalog
return HiveCatalog(name, **conf)
except ImportError as exc:
raise NotInstalledError("Apache Hive support not installed: pip install 'pyiceberg[hive]'") from exc
def load_glue(name: str, conf: Properties) -> Catalog:
try:
from pyiceberg.catalog.glue import GlueCatalog
return GlueCatalog(name, **conf)
except ImportError as exc:
raise NotInstalledError("AWS glue support not installed: pip install 'pyiceberg[glue]'") from exc
def load_dynamodb(name: str, conf: Properties) -> Catalog:
try:
from pyiceberg.catalog.dynamodb import DynamoDbCatalog
return DynamoDbCatalog(name, **conf)
except ImportError as exc:
raise NotInstalledError("AWS DynamoDB support not installed: pip install 'pyiceberg[dynamodb]'") from exc
def load_sql(name: str, conf: Properties) -> Catalog:
try:
from pyiceberg.catalog.sql import SqlCatalog
return SqlCatalog(name, **conf)
except ImportError as exc:
raise NotInstalledError(
"SQLAlchemy support not installed: pip install 'pyiceberg[sql-postgres]' or pip install 'pyiceberg[sql-sqlite]'"
) from exc
def load_in_memory(name: str, conf: Properties) -> Catalog:
try:
from pyiceberg.catalog.memory import InMemoryCatalog
return InMemoryCatalog(name, **conf)
except ImportError as exc:
raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc
AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
CatalogType.REST: load_rest,
CatalogType.HIVE: load_hive,
CatalogType.GLUE: load_glue,
CatalogType.DYNAMODB: load_dynamodb,
CatalogType.SQL: load_sql,
CatalogType.IN_MEMORY: load_in_memory,
}
def infer_catalog_type(name: str, catalog_properties: RecursiveDict) -> Optional[CatalogType]:
"""Try to infer the type based on the dict.
Args:
name: Name of the catalog.
catalog_properties: Catalog properties.
Returns:
The inferred type based on the provided properties.
Raises:
ValueError: Raises a ValueError in case properties are missing, or the wrong type.
"""
if uri := catalog_properties.get("uri"):
if isinstance(uri, str):
if uri.startswith("http"):
return CatalogType.REST
elif uri.startswith("thrift"):
return CatalogType.HIVE
elif uri.startswith(("sqlite", "postgresql")):
return CatalogType.SQL
else:
raise ValueError(f"Could not infer the catalog type from the uri: {uri}")
else:
raise ValueError(f"Expects the URI to be a string, got: {type(uri)}")
raise ValueError(
f"URI missing, please provide using --uri, the config or environment variable PYICEBERG_CATALOG__{name.upper()}__URI"
)
def load_catalog(name: Optional[str] = None, **properties: Optional[str]) -> Catalog:
"""Load the catalog based on the properties.
Will look up the properties from the config, based on the name.
Args:
name: The name of the catalog.
properties: The properties that are used next to the configuration.
Returns:
An initialized Catalog.
Raises:
ValueError: Raises a ValueError in case properties are missing or malformed,
or if it could not determine the catalog based on the properties.
"""
if name is None:
name = _ENV_CONFIG.get_default_catalog_name()
env = _ENV_CONFIG.get_catalog_config(name)
conf: RecursiveDict = merge_config(env or {}, cast(RecursiveDict, properties))
catalog_type: Optional[CatalogType]
provided_catalog_type = conf.get(TYPE)
if catalog_impl := properties.get(PY_CATALOG_IMPL):
if provided_catalog_type:
raise ValueError(
"Must not set both catalog type and py-catalog-impl configurations, "
f"but found type {provided_catalog_type} and py-catalog-impl {catalog_impl}"
)
if catalog := _import_catalog(name, catalog_impl, properties):
logger.info("Loaded Catalog: %s", catalog_impl)
return catalog
else:
raise ValueError(f"Could not initialize Catalog: {catalog_impl}")
catalog_type = None
if provided_catalog_type and isinstance(provided_catalog_type, str):
catalog_type = CatalogType(provided_catalog_type.lower())
elif not provided_catalog_type:
catalog_type = infer_catalog_type(name, conf)
if catalog_type:
return AVAILABLE_CATALOGS[catalog_type](name, cast(Dict[str, str], conf))
raise ValueError(f"Could not initialize catalog with the following properties: {properties}")
def delete_files(io: FileIO, files_to_delete: Set[str], file_type: str) -> None:
"""Delete files.
Log warnings if failing to delete any file.
Args:
io: The FileIO used to delete the object.
files_to_delete: A set of file paths to be deleted.
file_type: The type of the file.
"""
for file in files_to_delete:
try:
io.delete(file)
except OSError as exc:
logger.warning(msg=f"Failed to delete {file_type} file {file}", exc_info=exc)
def delete_data_files(io: FileIO, manifests_to_delete: List[ManifestFile]) -> None:
"""Delete data files linked to given manifests.
Log warnings if failing to delete any file.
Args:
io: The FileIO used to delete the object.
manifests_to_delete: A list of manifest contains paths of data files to be deleted.
"""
deleted_files: dict[str, bool] = {}
for manifest_file in manifests_to_delete:
for entry in manifest_file.fetch_manifest_entry(io, discard_deleted=False):
path = entry.data_file.file_path
if not deleted_files.get(path, False):
try:
io.delete(path)
except OSError as exc:
logger.warning(msg=f"Failed to delete data file {path}", exc_info=exc)
deleted_files[path] = True
def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Optional[Catalog]:
try:
path_parts = catalog_impl.split(".")
if len(path_parts) < 2:
raise ValueError(f"py-catalog-impl should be full path (module.CustomCatalog), got: {catalog_impl}")
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
return class_(name, **properties)
except ModuleNotFoundError as exc:
logger.warning(f"Could not initialize Catalog: {catalog_impl}", exc_info=exc)
return None
@dataclass
class PropertiesUpdateSummary:
removed: List[str]
updated: List[str]
missing: List[str]
class Catalog(ABC):
"""Base Catalog for table operations like - create, drop, load, list and others.
The catalog table APIs accept a table identifier, which is fully classified table name. The identifier can be a string or
tuple of strings. If the identifier is a string, it is split into a tuple on '.'. If it is a tuple, it is used as-is.
The catalog namespace APIs follow a similar convention wherein they also accept a namespace identifier that can be a string
or tuple of strings.
Attributes:
name (str): Name of the catalog.
properties (Properties): Catalog properties.
"""
name: str
properties: Properties
def __init__(self, name: str, **properties: str):
self.name = name
self.properties = properties
@abstractmethod
def create_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
"""Create a table.
Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
Returns:
Table: the created table instance.
Raises:
TableAlreadyExistsError: If a table with the name already exists.
"""
@abstractmethod
def create_table_transaction(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> CreateTableTransaction:
"""Create a CreateTableTransaction.
Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
Returns:
CreateTableTransaction: createTableTransaction instance.
"""
def create_table_if_not_exists(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
"""Create a table if it does not exist.
Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
Returns:
Table: the created table instance if the table does not exist, else the existing
table instance.
"""
try:
return self.create_table(identifier, schema, location, partition_spec, sort_order, properties)
except TableAlreadyExistsError:
return self.load_table(identifier)
@abstractmethod
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
You can also use this method to check for table existence using 'try catalog.table() except NoSuchTableError'.
Note: This method doesn't scan data stored in the table.
Args:
identifier (str | Identifier): Table identifier.
Returns:
Table: the table instance with its metadata.
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
@abstractmethod
def table_exists(self, identifier: Union[str, Identifier]) -> bool:
"""Check if a table exists.
Args:
identifier (str | Identifier): Table identifier.
Returns:
bool: True if the table exists, False otherwise.
"""
@abstractmethod
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
"""Check if a view exists.
Args:
identifier (str | Identifier): View identifier.
Returns:
bool: True if the view exists, False otherwise.
"""
@abstractmethod
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata
Returns:
Table: The newly registered table
Raises:
TableAlreadyExistsError: If the table already exists
"""
@abstractmethod
def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.
Args:
identifier (str | Identifier): Table identifier.
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
@abstractmethod
def purge_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table and purge all data and metadata files.
Note: This method only logs warning rather than raise exception when encountering file deletion failure.
Args:
identifier (str | Identifier): Table identifier.
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
@abstractmethod
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
"""Rename a fully classified table name.
Args:
from_identifier (str | Identifier): Existing table identifier.
to_identifier (str | Identifier): New table identifier.
Returns:
Table: the updated table instance with its metadata.
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
@abstractmethod
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.
Args:
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.
Returns:
CommitTableResponse: The updated metadata.
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
"""
@abstractmethod
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
Args:
namespace (str | Identifier): Namespace identifier.
properties (Properties): A string dictionary of properties for the given namespace.
Raises:
NamespaceAlreadyExistsError: If a namespace with the given name already exists.
"""
def create_namespace_if_not_exists(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace if it does not exist.
Args:
namespace (str | Identifier): Namespace identifier.
properties (Properties): A string dictionary of properties for the given namespace.
"""
try:
self.create_namespace(namespace, properties)
except NamespaceAlreadyExistsError:
pass
@abstractmethod
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
"""Drop a namespace.
Args:
namespace (str | Identifier): Namespace identifier.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
NamespaceNotEmptyError: If the namespace is not empty.
"""
@abstractmethod
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
"""List tables under the given namespace in the catalog.
Args:
namespace (str | Identifier): Namespace identifier to search.
Returns:
List[Identifier]: list of table identifiers.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""
@abstractmethod
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
Args:
namespace (str | Identifier): Namespace identifier to search.
Returns:
List[Identifier]: a List of namespace identifiers.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""
@abstractmethod
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
"""List views under the given namespace in the catalog.
Args:
namespace (str | Identifier): Namespace identifier to search.
Returns:
List[Identifier]: list of table identifiers.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""
@abstractmethod
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
"""Get properties for a namespace.
Args:
namespace (str | Identifier): Namespace identifier.
Returns:
Properties: Properties for the given namespace.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""
@abstractmethod
def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
"""Remove provided property keys and updates properties for a namespace.
Args:
namespace (str | Identifier): Namespace identifier.
removals (Set[str]): Set of property keys that need to be removed. Optional Argument.
updates (Properties): Properties to be updated for the given namespace.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
ValueError: If removals and updates have overlapping keys.
"""
@abstractmethod
def drop_view(self, identifier: Union[str, Identifier]) -> None:
"""Drop a view.
Args:
identifier (str | Identifier): View identifier.
Raises:
NoSuchViewError: If a view with the given name does not exist.
"""
@staticmethod
def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
"""Parse an identifier to a tuple.
If the identifier is a string, it is split into a tuple on '.'. If it is a tuple, it is used as-is.
Args:
identifier (str | Identifier): an identifier, either a string or tuple of strings.
Returns:
Identifier: a tuple of strings.
"""
return identifier if isinstance(identifier, tuple) else tuple(str.split(identifier, "."))
@staticmethod
def table_name_from(identifier: Union[str, Identifier]) -> str:
"""Extract table name from a table identifier.
Args:
identifier (str | Identifier: a table identifier.
Returns:
str: Table name.
"""
return Catalog.identifier_to_tuple(identifier)[-1]
@staticmethod
def namespace_from(identifier: Union[str, Identifier]) -> Identifier:
"""Extract table namespace from a table identifier.
Args:
identifier (Union[str, Identifier]): a table identifier.
Returns:
Identifier: Namespace identifier.
"""
return Catalog.identifier_to_tuple(identifier)[:-1]
@staticmethod
def namespace_to_string(
identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError
) -> str:
"""Transform a namespace identifier into a string.
Args:
identifier (Union[str, Identifier]): a namespace identifier.
err (Union[Type[ValueError], Type[NoSuchNamespaceError]]): the error type to raise when identifier is empty.
Returns:
Identifier: Namespace identifier.
"""
tuple_identifier = Catalog.identifier_to_tuple(identifier)
if len(tuple_identifier) < 1:
raise err("Empty namespace identifier")
# Check if any segment of the tuple is an empty string
if any(segment.strip() == "" for segment in tuple_identifier):
raise err("Namespace identifier contains an empty segment or a segment with only whitespace")
return ".".join(segment.strip() for segment in tuple_identifier)
@staticmethod
def identifier_to_database(
identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError
) -> str:
tuple_identifier = Catalog.identifier_to_tuple(identifier)
if len(tuple_identifier) != 1:
raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}")
return tuple_identifier[0]
@staticmethod
def identifier_to_database_and_table(
identifier: Union[str, Identifier],
err: Union[Type[ValueError], Type[NoSuchTableError], Type[NoSuchNamespaceError]] = ValueError,
) -> Tuple[str, str]:
tuple_identifier = Catalog.identifier_to_tuple(identifier)
if len(tuple_identifier) != 2:
raise err(f"Invalid path, hierarchical namespaces are not supported: {identifier}")
return tuple_identifier[0], tuple_identifier[1]
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO:
return load_file_io({**self.properties, **properties}, location)
@staticmethod
def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
if isinstance(schema, Schema):
return schema
try:
import pyarrow as pa
from pyiceberg.io.pyarrow import _ConvertToIcebergWithoutIDs, visit_pyarrow
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
if isinstance(schema, pa.Schema):
schema: Schema = visit_pyarrow( # type: ignore
schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
)
return schema
except ModuleNotFoundError:
pass
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")
@staticmethod
def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None:
"""Delete oldest metadata if config is set to true."""
delete_after_commit: bool = property_as_bool(
metadata.properties,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT,
)
if delete_after_commit:
removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log}
current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log}
removed_previous_metadata_files.difference_update(current_metadata_files)
delete_files(io, removed_previous_metadata_files, METADATA)
def __repr__(self) -> str:
"""Return the string representation of the Catalog class."""
return f"{self.name} ({self.__class__})"
class MetastoreCatalog(Catalog, ABC):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
def create_table_transaction(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> CreateTableTransaction:
return CreateTableTransaction(
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
)
def table_exists(self, identifier: Union[str, Identifier]) -> bool:
try:
self.load_table(identifier)
return True
except NoSuchTableError:
return False
def purge_table(self, identifier: Union[str, Identifier]) -> None:
table = self.load_table(identifier)
self.drop_table(identifier)
io = load_file_io(self.properties, table.metadata_location)
metadata = table.metadata
manifest_lists_to_delete = set()
manifests_to_delete: List[ManifestFile] = []
for snapshot in metadata.snapshots:
manifests_to_delete += snapshot.manifests(io)
manifest_lists_to_delete.add(snapshot.manifest_list)
manifest_paths_to_delete = {manifest.manifest_path for manifest in manifests_to_delete}
prev_metadata_files = {log.metadata_file for log in metadata.metadata_log}
delete_data_files(io, manifests_to_delete)
delete_files(io, manifest_paths_to_delete, MANIFEST)
delete_files(io, manifest_lists_to_delete, MANIFEST_LIST)
delete_files(io, prev_metadata_files, PREVIOUS_METADATA)
delete_files(io, {table.metadata_location}, METADATA)
def _create_staged_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> StagedTable:
"""Create a table and return the table instance without committing the changes.
Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
Returns:
StagedTable: the created staged table instance.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
database_name, table_name = self.identifier_to_database_and_table(identifier)
location = self._resolve_table_location(location, database_name, table_name)
provider = load_location_provider(location, properties)
metadata_location = provider.new_table_metadata_file_location()
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
io = self._load_file_io(properties=properties, location=metadata_location)
return StagedTable(
identifier=(database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=io,
catalog=self,
)
def _update_and_stage_table(
self,
current_table: Optional[Table],
table_identifier: Identifier,
requirements: Tuple[TableRequirement, ...],
updates: Tuple[TableUpdate, ...],
) -> StagedTable:
for requirement in requirements:
requirement.validate(current_table.metadata if current_table else None)
updated_metadata = update_table_metadata(
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=updates,
enforce_validation=current_table is None,
metadata_location=current_table.metadata_location if current_table else None,
)
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
provider = load_location_provider(updated_metadata.location, updated_metadata.properties)
new_metadata_location = provider.new_table_metadata_file_location(new_metadata_version)
return StagedTable(
identifier=table_identifier,
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),
catalog=self,
)
def _get_updated_props_and_update_summary(
self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties
) -> Tuple[PropertiesUpdateSummary, Properties]:
self._check_for_overlap(updates=updates, removals=removals)
updated_properties = dict(current_properties)
removed: Set[str] = set()
updated: Set[str] = set()
if removals:
for key in removals:
if key in updated_properties:
updated_properties.pop(key)
removed.add(key)
if updates:
for key, value in updates.items():
updated_properties[key] = value
updated.add(key)
expected_to_change = (removals or set()).difference(removed)
properties_update_summary = PropertiesUpdateSummary(
removed=list(removed or []), updated=list(updated or []), missing=list(expected_to_change)
)
return properties_update_summary, updated_properties
def _resolve_table_location(self, location: Optional[str], database_name: str, table_name: str) -> str:
if not location:
return self._get_default_warehouse_location(database_name, table_name)
return location.rstrip("/")
def _get_default_warehouse_location(self, database_name: str, table_name: str) -> str:
database_properties = self.load_namespace_properties(database_name)
if database_location := database_properties.get(LOCATION):
database_location = database_location.rstrip("/")
return f"{database_location}/{table_name}"
if warehouse_path := self.properties.get(WAREHOUSE_LOCATION):
warehouse_path = warehouse_path.rstrip("/")
return f"{warehouse_path}/{database_name}.db/{table_name}"
raise ValueError("No default path is set, please specify a location when creating a table")
@staticmethod
def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None:
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
@staticmethod
def _parse_metadata_version(metadata_location: str) -> int:
"""Parse the version from the metadata location.
The version is the first part of the file name, before the first dash.
For example, the version of the metadata file
`s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json`
is 1.
If the path does not comply with the pattern, the version is defaulted to be -1, ensuring
that the next metadata file is treated as having version 0.
Args:
metadata_location (str): The location of the metadata file.
Returns:
int: The version of the metadata file. -1 if the file name does not have valid version string
"""
file_name = metadata_location.split("/")[-1]
if file_name_match := TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name):
try:
uuid.UUID(file_name_match.group(2))
except ValueError:
return -1
return int(file_name_match.group(1))
else:
return -1
@staticmethod
def _check_for_overlap(removals: Optional[Set[str]], updates: Properties) -> None:
if updates and removals:
overlap = set(removals) & set(updates.keys())
if overlap:
raise ValueError(f"Updates and deletes have an overlap: {overlap}")
@staticmethod
def _empty_table_metadata() -> TableMetadata:
"""Return an empty TableMetadata instance.
It is used to build a TableMetadata from a sequence of initial TableUpdates.
It is a V1 TableMetadata because there will be a UpgradeFormatVersionUpdate in
initial changes to bump the metadata to the target version.
Returns:
TableMetadata: An empty TableMetadata instance.
"""
return TableMetadataV1.model_construct(last_column_id=-1, schema=Schema())