pyiceberg/catalog/sql.py (490 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from typing import ( TYPE_CHECKING, List, Optional, Set, Tuple, Union, ) from sqlalchemy import ( String, create_engine, delete, insert, select, union, update, ) from sqlalchemy.exc import IntegrityError, NoResultFound, OperationalError, ProgrammingError from sqlalchemy.orm import ( DeclarativeBase, Mapped, MappedAsDataclass, Session, mapped_column, ) from pyiceberg.catalog import ( METADATA_LOCATION, Catalog, MetastoreCatalog, PropertiesUpdateSummary, ) from pyiceberg.exceptions import ( CommitFailedException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError, TableAlreadyExistsError, ) from pyiceberg.io import load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile from pyiceberg.table import CommitTableResponse, Table from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( TableRequirement, TableUpdate, ) from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties from pyiceberg.types import strtobool if TYPE_CHECKING: import pyarrow as pa DEFAULT_ECHO_VALUE = "false" DEFAULT_POOL_PRE_PING_VALUE = "false" DEFAULT_INIT_CATALOG_TABLES = "true" class SqlCatalogBaseTable(MappedAsDataclass, DeclarativeBase): pass class IcebergTables(SqlCatalogBaseTable): __tablename__ = "iceberg_tables" catalog_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True) table_namespace: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True) table_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True) metadata_location: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True) previous_metadata_location: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True) class IcebergNamespaceProperties(SqlCatalogBaseTable): __tablename__ = "iceberg_namespace_properties" # Catalog minimum Namespace Properties NAMESPACE_MINIMAL_PROPERTIES = {"exists": "true"} catalog_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True) namespace: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True) property_key: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True) property_value: Mapped[str] = mapped_column(String(1000), nullable=False) class SqlCatalog(MetastoreCatalog): """Implementation of a SQL based catalog. In the `JDBCCatalog` implementation, a `Namespace` is composed of a list of strings separated by dots: `'ns1.ns2.ns3'`. And you can have as many levels as you want, but you need at least one. The `SqlCatalog` honors the same convention. In the `JDBCCatalog` implementation, a `TableIdentifier` is composed of an optional `Namespace` and a table name. When a `Namespace` is present, the full name will be `'ns1.ns2.ns3.table'`. A valid `TableIdentifier` could be `'name'` (no namespace). The `SqlCatalog` has a different convention where a `TableIdentifier` requires a `Namespace`. """ def __init__(self, name: str, **properties: str): super().__init__(name, **properties) if not (uri_prop := self.properties.get("uri")): raise NoSuchPropertyException("SQL connection URI is required") echo_str = str(self.properties.get("echo", DEFAULT_ECHO_VALUE)).lower() echo = strtobool(echo_str) if echo_str != "debug" else "debug" pool_pre_ping = strtobool(self.properties.get("pool_pre_ping", DEFAULT_POOL_PRE_PING_VALUE)) init_catalog_tables = strtobool(self.properties.get("init_catalog_tables", DEFAULT_INIT_CATALOG_TABLES)) self.engine = create_engine(uri_prop, echo=echo, pool_pre_ping=pool_pre_ping) if init_catalog_tables: self._ensure_tables_exist() def _ensure_tables_exist(self) -> None: with Session(self.engine) as session: for table in [IcebergTables, IcebergNamespaceProperties]: stmt = select(1).select_from(table) try: session.scalar(stmt) except ( OperationalError, ProgrammingError, ): # sqlalchemy returns OperationalError in case of sqlite and ProgrammingError with postgres. self.create_tables() return def create_tables(self) -> None: SqlCatalogBaseTable.metadata.create_all(self.engine) def destroy_tables(self) -> None: SqlCatalogBaseTable.metadata.drop_all(self.engine) def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table: # Check for expected properties. if not (metadata_location := orm_table.metadata_location): raise NoSuchTableError(f"Table property {METADATA_LOCATION} is missing") if not (table_namespace := orm_table.table_namespace): raise NoSuchTableError(f"Table property {IcebergTables.table_namespace} is missing") if not (table_name := orm_table.table_name): raise NoSuchTableError(f"Table property {IcebergTables.table_name} is missing") io = load_file_io(properties=self.properties, location=metadata_location) file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) return Table( identifier=Catalog.identifier_to_tuple(table_namespace) + (table_name,), metadata=metadata, metadata_location=metadata_location, io=self._load_file_io(metadata.properties, metadata_location), catalog=self, ) def create_table( self, identifier: Union[str, Identifier], schema: Union[Schema, "pa.Schema"], location: Optional[str] = None, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, ) -> Table: """ Create an Iceberg table. Args: identifier: Table identifier. schema: Table's schema. location: Location for the table. Optional Argument. partition_spec: PartitionSpec for the table. sort_order: SortOrder for the table. properties: Table properties that can be a string based dictionary. Returns: Table: the created table instance. Raises: AlreadyExistsError: If a table with the name already exists. ValueError: If the identifier is invalid, or no path is given to store metadata. """ schema: Schema = self._convert_schema_if_needed(schema) # type: ignore namespace_identifier = Catalog.namespace_from(identifier) table_name = Catalog.table_name_from(identifier) if not self._namespace_exists(namespace_identifier): raise NoSuchNamespaceError(f"Namespace does not exist: {namespace_identifier}") namespace = Catalog.namespace_to_string(namespace_identifier) location = self._resolve_table_location(location, namespace, table_name) location_provider = load_location_provider(table_location=location, table_properties=properties) metadata_location = location_provider.new_table_metadata_file_location() metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) io = load_file_io(properties=self.properties, location=metadata_location) self._write_metadata(metadata, io, metadata_location) with Session(self.engine) as session: try: session.add( IcebergTables( catalog_name=self.name, table_namespace=namespace, table_name=table_name, metadata_location=metadata_location, previous_metadata_location=None, ) ) session.commit() except IntegrityError as e: raise TableAlreadyExistsError(f"Table {namespace}.{table_name} already exists") from e return self.load_table(identifier=identifier) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. Args: identifier Union[str, Identifier]: Table identifier for the table metadata_location str: The location to the metadata Returns: Table: The newly registered table Raises: TableAlreadyExistsError: If the table already exists NoSuchNamespaceError: If namespace does not exist """ namespace_tuple = Catalog.namespace_from(identifier) namespace = Catalog.namespace_to_string(namespace_tuple) table_name = Catalog.table_name_from(identifier) if not self._namespace_exists(namespace): raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") with Session(self.engine) as session: try: session.add( IcebergTables( catalog_name=self.name, table_namespace=namespace, table_name=table_name, metadata_location=metadata_location, previous_metadata_location=None, ) ) session.commit() except IntegrityError as e: raise TableAlreadyExistsError(f"Table {namespace}.{table_name} already exists") from e return self.load_table(identifier=identifier) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and return the table instance. You can also use this method to check for table existence using 'try catalog.table() except NoSuchTableError'. Note: This method doesn't scan data stored in the table. Args: identifier (str | Identifier): Table identifier. Returns: Table: the table instance with its metadata. Raises: NoSuchTableError: If a table with the name does not exist. """ namespace_tuple = Catalog.namespace_from(identifier) namespace = Catalog.namespace_to_string(namespace_tuple) table_name = Catalog.table_name_from(identifier) with Session(self.engine) as session: stmt = select(IcebergTables).where( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, ) result = session.scalar(stmt) if result: return self._convert_orm_to_iceberg(result) raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. Args: identifier (str | Identifier): Table identifier. Raises: NoSuchTableError: If a table with the name does not exist. """ namespace_tuple = Catalog.namespace_from(identifier) namespace = Catalog.namespace_to_string(namespace_tuple) table_name = Catalog.table_name_from(identifier) with Session(self.engine) as session: if self.engine.dialect.supports_sane_rowcount: res = session.execute( delete(IcebergTables).where( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, ) ) if res.rowcount < 1: raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") else: try: tbl = ( session.query(IcebergTables) .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, ) .one() ) session.delete(tbl) except NoResultFound as e: raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") from e session.commit() def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. Args: from_identifier (str | Identifier): Existing table identifier. to_identifier (str | Identifier): New table identifier. Returns: Table: the updated table instance with its metadata. Raises: NoSuchTableError: If a table with the name does not exist. TableAlreadyExistsError: If a table with the new name already exist. NoSuchNamespaceError: If the target namespace does not exist. """ from_namespace_tuple = Catalog.namespace_from(from_identifier) from_namespace = Catalog.namespace_to_string(from_namespace_tuple) from_table_name = Catalog.table_name_from(from_identifier) to_namespace_tuple = Catalog.namespace_from(to_identifier) to_namespace = Catalog.namespace_to_string(to_namespace_tuple) to_table_name = Catalog.table_name_from(to_identifier) if not self._namespace_exists(to_namespace): raise NoSuchNamespaceError(f"Namespace does not exist: {to_namespace}") with Session(self.engine) as session: try: if self.engine.dialect.supports_sane_rowcount: stmt = ( update(IcebergTables) .where( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == from_namespace, IcebergTables.table_name == from_table_name, ) .values(table_namespace=to_namespace, table_name=to_table_name) ) result = session.execute(stmt) if result.rowcount < 1: raise NoSuchTableError(f"Table does not exist: {from_table_name}") else: try: tbl = ( session.query(IcebergTables) .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == from_namespace, IcebergTables.table_name == from_table_name, ) .one() ) tbl.table_namespace = to_namespace tbl.table_name = to_table_name except NoResultFound as e: raise NoSuchTableError(f"Table does not exist: {from_table_name}") from e session.commit() except IntegrityError as e: raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e return self.load_table(to_identifier) def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: """Commit updates to a table. Args: table (Table): The table to be updated. requirements: (Tuple[TableRequirement, ...]): Table requirements. updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. Raises: NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ table_identifier = table.name() namespace_tuple = Catalog.namespace_from(table_identifier) namespace = Catalog.namespace_to_string(namespace_tuple) table_name = Catalog.table_name_from(table_identifier) current_table: Optional[Table] try: current_table = self.load_table(table_identifier) except NoSuchTableError: current_table = None updated_staged_table = self._update_and_stage_table(current_table, table.name(), requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) self._write_metadata( metadata=updated_staged_table.metadata, io=updated_staged_table.io, metadata_path=updated_staged_table.metadata_location, ) with Session(self.engine) as session: if current_table: # table exists, update it if self.engine.dialect.supports_sane_rowcount: stmt = ( update(IcebergTables) .where( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, IcebergTables.metadata_location == current_table.metadata_location, ) .values( metadata_location=updated_staged_table.metadata_location, previous_metadata_location=current_table.metadata_location, ) ) result = session.execute(stmt) if result.rowcount < 1: raise CommitFailedException(f"Table has been updated by another process: {namespace}.{table_name}") else: try: tbl = ( session.query(IcebergTables) .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, IcebergTables.metadata_location == current_table.metadata_location, ) .one() ) tbl.metadata_location = updated_staged_table.metadata_location tbl.previous_metadata_location = current_table.metadata_location except NoResultFound as e: raise CommitFailedException(f"Table has been updated by another process: {namespace}.{table_name}") from e session.commit() else: # table does not exist, create it try: session.add( IcebergTables( catalog_name=self.name, table_namespace=namespace, table_name=table_name, metadata_location=updated_staged_table.metadata_location, previous_metadata_location=None, ) ) session.commit() except IntegrityError as e: raise TableAlreadyExistsError(f"Table {namespace}.{table_name} already exists") from e return CommitTableResponse( metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location ) def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool: namespace_tuple = Catalog.identifier_to_tuple(identifier) namespace = Catalog.namespace_to_string(namespace_tuple, NoSuchNamespaceError) namespace_starts_with = namespace.replace("!", "!!").replace("_", "!_").replace("%", "!%") + ".%" with Session(self.engine) as session: stmt = ( select(IcebergTables) .where( IcebergTables.catalog_name == self.name, (IcebergTables.table_namespace == namespace) | (IcebergTables.table_namespace.like(namespace_starts_with, escape="!")), ) .limit(1) ) result = session.execute(stmt).all() if result: return True stmt = ( select(IcebergNamespaceProperties) .where( IcebergNamespaceProperties.catalog_name == self.name, (IcebergNamespaceProperties.namespace == namespace) | (IcebergNamespaceProperties.namespace.like(namespace_starts_with, escape="!")), ) .limit(1) ) result = session.execute(stmt).all() if result: return True return False def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier. properties (Properties): A string dictionary of properties for the given namespace. Raises: NamespaceAlreadyExistsError: If a namespace with the given name already exists. """ if self._namespace_exists(namespace): raise NamespaceAlreadyExistsError(f"Namespace {namespace} already exists") if not properties: properties = IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES create_properties = properties if properties else IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES with Session(self.engine) as session: for key, value in create_properties.items(): session.add( IcebergNamespaceProperties( catalog_name=self.name, namespace=Catalog.namespace_to_string(namespace, NoSuchNamespaceError), property_key=key, property_value=value, ) ) session.commit() def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """Drop a namespace. Args: namespace (str | Identifier): Namespace identifier. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. NamespaceNotEmptyError: If the namespace is not empty. """ if not self._namespace_exists(namespace): raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") namespace_str = Catalog.namespace_to_string(namespace) if tables := self.list_tables(namespace): raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty. {len(tables)} tables exist.") with Session(self.engine) as session: session.execute( delete(IcebergNamespaceProperties).where( IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == namespace_str, ) ) session.commit() def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: List[Identifier]: list of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ if namespace and not self._namespace_exists(namespace): raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") namespace = Catalog.namespace_to_string(namespace) stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace) with Session(self.engine) as session: result = session.scalars(stmt) return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: List[Identifier]: a List of namespace identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ if namespace and not self._namespace_exists(namespace): raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") table_stmt = select(IcebergTables.table_namespace).where(IcebergTables.catalog_name == self.name) namespace_stmt = select(IcebergNamespaceProperties.namespace).where(IcebergNamespaceProperties.catalog_name == self.name) if namespace: namespace_like = Catalog.namespace_to_string(namespace, NoSuchNamespaceError) + "%" table_stmt = table_stmt.where(IcebergTables.table_namespace.like(namespace_like)) namespace_stmt = namespace_stmt.where(IcebergNamespaceProperties.namespace.like(namespace_like)) stmt = union( table_stmt, namespace_stmt, ) with Session(self.engine) as session: namespace_tuple = Catalog.identifier_to_tuple(namespace) sub_namespaces_level_length = len(namespace_tuple) + 1 namespaces = list( { # only get distinct namespaces ns[:sub_namespaces_level_length] # truncate to the required level for ns in {Catalog.identifier_to_tuple(ns) for ns in session.execute(stmt).scalars()} if len(ns) >= sub_namespaces_level_length # only get sub namespaces/children and ns[: sub_namespaces_level_length - 1] == namespace_tuple # exclude fuzzy matches when `namespace` contains `%` or `_` } ) return namespaces def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. Args: namespace (str | Identifier): Namespace identifier. Returns: Properties: Properties for the given namespace. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ namespace_str = Catalog.namespace_to_string(namespace) if not self._namespace_exists(namespace): raise NoSuchNamespaceError(f"Namespace {namespace_str} does not exists") stmt = select(IcebergNamespaceProperties).where( IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == namespace_str ) with Session(self.engine) as session: result = session.scalars(stmt) return {props.property_key: props.property_value for props in result} def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: """Remove provided property keys and update properties for a namespace. Args: namespace (str | Identifier): Namespace identifier. removals (Set[str]): Set of property keys that need to be removed. Optional Argument. updates (Properties): Properties to be updated for the given namespace. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. ValueError: If removals and updates have overlapping keys. """ namespace_str = Catalog.namespace_to_string(namespace) if not self._namespace_exists(namespace): raise NoSuchNamespaceError(f"Namespace {namespace_str} does not exists") current_properties = self.load_namespace_properties(namespace=namespace) properties_update_summary = self._get_updated_props_and_update_summary( current_properties=current_properties, removals=removals, updates=updates )[0] with Session(self.engine) as session: if removals: delete_stmt = delete(IcebergNamespaceProperties).where( IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == namespace_str, IcebergNamespaceProperties.property_key.in_(removals), ) session.execute(delete_stmt) if updates: # SQLAlchemy does not (yet) support engine agnostic UPSERT # https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html#orm-upsert-statements # This is not a problem since it runs in a single transaction delete_stmt = delete(IcebergNamespaceProperties).where( IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == namespace_str, IcebergNamespaceProperties.property_key.in_(set(updates.keys())), ) session.execute(delete_stmt) insert_stmt_values = [ { IcebergNamespaceProperties.catalog_name: self.name, IcebergNamespaceProperties.namespace: namespace_str, IcebergNamespaceProperties.property_key: property_key, IcebergNamespaceProperties.property_value: property_value, } for property_key, property_value in updates.items() ] insert_stmt = insert(IcebergNamespaceProperties).values(insert_stmt_values) session.execute(insert_stmt) session.commit() return properties_update_summary def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError def view_exists(self, identifier: Union[str, Identifier]) -> bool: raise NotImplementedError def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError