pyiceberg/io/__init__.py (197 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Base FileIO classes for implementing reading and writing table files.
The FileIO abstraction includes a subset of full filesystem implementations. Specifically,
Iceberg needs to read or write a file at a given location (as a seekable stream), as well
as check if a file exists. An implementation of the FileIO abstract base class is responsible
for returning an InputFile instance, an OutputFile instance, and deleting a file given
its location.
"""
from __future__ import annotations
import importlib
import logging
import os
import warnings
from abc import ABC, abstractmethod
from io import SEEK_SET
from types import TracebackType
from typing import (
Dict,
List,
Optional,
Protocol,
Tuple,
Type,
Union,
runtime_checkable,
)
from urllib.parse import urlparse
from pyiceberg.typedef import EMPTY_DICT, Properties
logger = logging.getLogger(__name__)
AWS_REGION = "client.region"
AWS_ACCESS_KEY_ID = "client.access-key-id"
AWS_SECRET_ACCESS_KEY = "client.secret-access-key"
AWS_SESSION_TOKEN = "client.session-token"
AWS_ROLE_ARN = "client.role-arn"
AWS_ROLE_SESSION_NAME = "client.role-session-name"
S3_ENDPOINT = "s3.endpoint"
S3_ACCESS_KEY_ID = "s3.access-key-id"
S3_SECRET_ACCESS_KEY = "s3.secret-access-key"
S3_SESSION_TOKEN = "s3.session-token"
S3_REGION = "s3.region"
S3_RESOLVE_REGION = "s3.resolve-region"
S3_PROXY_URI = "s3.proxy-uri"
S3_CONNECT_TIMEOUT = "s3.connect-timeout"
S3_REQUEST_TIMEOUT = "s3.request-timeout"
S3_SIGNER = "s3.signer"
S3_SIGNER_URI = "s3.signer.uri"
S3_SIGNER_ENDPOINT = "s3.signer.endpoint"
S3_SIGNER_ENDPOINT_DEFAULT = "v1/aws/s3/sign"
S3_ROLE_ARN = "s3.role-arn"
S3_ROLE_SESSION_NAME = "s3.role-session-name"
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
HDFS_HOST = "hdfs.host"
HDFS_PORT = "hdfs.port"
HDFS_USER = "hdfs.user"
HDFS_KERB_TICKET = "hdfs.kerberos_ticket"
ADLS_CONNECTION_STRING = "adls.connection-string"
ADLS_ACCOUNT_NAME = "adls.account-name"
ADLS_ACCOUNT_KEY = "adls.account-key"
ADLS_SAS_TOKEN = "adls.sas-token"
ADLS_TENANT_ID = "adls.tenant-id"
ADLS_CLIENT_ID = "adls.client-id"
ADLS_ClIENT_SECRET = "adls.client-secret"
GCS_TOKEN = "gcs.oauth2.token"
GCS_TOKEN_EXPIRES_AT_MS = "gcs.oauth2.token-expires-at"
GCS_PROJECT_ID = "gcs.project-id"
GCS_ACCESS = "gcs.access"
GCS_CONSISTENCY = "gcs.consistency"
GCS_CACHE_TIMEOUT = "gcs.cache-timeout"
GCS_REQUESTER_PAYS = "gcs.requester-pays"
GCS_SESSION_KWARGS = "gcs.session-kwargs"
GCS_SERVICE_HOST = "gcs.service.host"
GCS_DEFAULT_LOCATION = "gcs.default-bucket-location"
GCS_VERSION_AWARE = "gcs.version-aware"
PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read"
@runtime_checkable
class InputStream(Protocol):
"""A protocol for the file-like object returned by InputFile.open(...).
This outlines the minimally required methods for a seekable input stream returned from an InputFile
implementation's `open(...)` method. These methods are a subset of IOBase/RawIOBase.
"""
@abstractmethod
def read(self, size: int = 0) -> bytes: ...
@abstractmethod
def seek(self, offset: int, whence: int = SEEK_SET) -> int: ...
@abstractmethod
def tell(self) -> int: ...
@abstractmethod
def close(self) -> None: ...
def __enter__(self) -> InputStream:
"""Provide setup when opening an InputStream using a 'with' statement."""
@abstractmethod
def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""
@runtime_checkable
class OutputStream(Protocol): # pragma: no cover
"""A protocol for the file-like object returned by OutputFile.create(...).
This outlines the minimally required methods for a writable output stream returned from an OutputFile
implementation's `create(...)` method. These methods are a subset of IOBase/RawIOBase.
"""
@abstractmethod
def write(self, b: bytes) -> int: ...
@abstractmethod
def close(self) -> None: ...
@abstractmethod
def __enter__(self) -> OutputStream:
"""Provide setup when opening an OutputStream using a 'with' statement."""
@abstractmethod
def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""
class InputFile(ABC):
"""A base class for InputFile implementations.
Args:
location (str): A URI or a path to a local file.
Attributes:
location (str): The URI or path to a local file for an InputFile instance.
exists (bool): Whether the file exists or not.
"""
def __init__(self, location: str):
self._location = location
@abstractmethod
def __len__(self) -> int:
"""Return the total length of the file, in bytes."""
@property
def location(self) -> str:
"""The fully-qualified location of the input file."""
return self._location
@abstractmethod
def exists(self) -> bool:
"""Check whether the location exists.
Raises:
PermissionError: If the file at self.location cannot be accessed due to a permission error.
"""
@abstractmethod
def open(self, seekable: bool = True) -> InputStream:
"""Return an object that matches the InputStream protocol.
Args:
seekable: If the stream should support seek, or if it is consumed sequential.
Returns:
InputStream: An object that matches the InputStream protocol.
Raises:
PermissionError: If the file at self.location cannot be accessed due to a permission error.
FileNotFoundError: If the file at self.location does not exist.
"""
class OutputFile(ABC):
"""A base class for OutputFile implementations.
Args:
location (str): A URI or a path to a local file.
Attributes:
location (str): The URI or path to a local file for an OutputFile instance.
exists (bool): Whether the file exists or not.
"""
def __init__(self, location: str):
self._location = location
@abstractmethod
def __len__(self) -> int:
"""Return the total length of the file, in bytes."""
@property
def location(self) -> str:
"""The fully-qualified location of the output file."""
return self._location
@abstractmethod
def exists(self) -> bool:
"""Check whether the location exists.
Raises:
PermissionError: If the file at self.location cannot be accessed due to a permission error.
"""
@abstractmethod
def to_input_file(self) -> InputFile:
"""Return an InputFile for the location of this output file."""
@abstractmethod
def create(self, overwrite: bool = False) -> OutputStream:
"""Return an object that matches the OutputStream protocol.
Args:
overwrite (bool): If the file already exists at `self.location`
and `overwrite` is False a FileExistsError should be raised.
Returns:
OutputStream: An object that matches the OutputStream protocol.
Raises:
PermissionError: If the file at self.location cannot be accessed due to a permission error.
FileExistsError: If the file at self.location already exists and `overwrite=False`.
"""
class FileIO(ABC):
"""A base class for FileIO implementations."""
properties: Properties
def __init__(self, properties: Properties = EMPTY_DICT):
self.properties = properties
@abstractmethod
def new_input(self, location: str) -> InputFile:
"""Get an InputFile instance to read bytes from the file at the given location.
Args:
location (str): A URI or a path to a local file.
"""
@abstractmethod
def new_output(self, location: str) -> OutputFile:
"""Get an OutputFile instance to write bytes to the file at the given location.
Args:
location (str): A URI or a path to a local file.
"""
@abstractmethod
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
"""Delete the file at the given path.
Args:
location (Union[str, InputFile, OutputFile]): A URI or a path to a local file--if an InputFile instance or
an OutputFile instance is provided, the location attribute for that instance is used as the URI to delete.
Raises:
PermissionError: If the file at location cannot be accessed due to a permission error.
FileNotFoundError: When the file at the provided location does not exist.
"""
LOCATION = "location"
WAREHOUSE = "warehouse"
ARROW_FILE_IO = "pyiceberg.io.pyarrow.PyArrowFileIO"
FSSPEC_FILE_IO = "pyiceberg.io.fsspec.FsspecFileIO"
# Mappings from the Java FileIO impl to a Python one. The list is ordered by preference.
# If an implementation isn't installed, it will fall back to the next one.
SCHEMA_TO_FILE_IO: Dict[str, List[str]] = {
"s3": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"oss": [ARROW_FILE_IO],
"gs": [ARROW_FILE_IO],
"file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"hdfs": [ARROW_FILE_IO],
"viewfs": [ARROW_FILE_IO],
"abfs": [FSSPEC_FILE_IO],
"abfss": [FSSPEC_FILE_IO],
}
def _import_file_io(io_impl: str, properties: Properties) -> Optional[FileIO]:
try:
path_parts = io_impl.split(".")
if len(path_parts) < 2:
raise ValueError(f"py-io-impl should be full path (module.CustomFileIO), got: {io_impl}")
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
return class_(properties)
except ModuleNotFoundError as exc:
logger.warning(f"Could not initialize FileIO: {io_impl}", exc_info=exc)
return None
PY_IO_IMPL = "py-io-impl"
def _infer_file_io_from_scheme(path: str, properties: Properties) -> Optional[FileIO]:
parsed_url = urlparse(path)
if parsed_url.scheme:
if file_ios := SCHEMA_TO_FILE_IO.get(parsed_url.scheme):
for file_io_path in file_ios:
if file_io := _import_file_io(file_io_path, properties):
return file_io
else:
warnings.warn(f"No preferred file implementation for scheme: {parsed_url.scheme}")
return None
def load_file_io(properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO:
# First look for the py-io-impl property to directly load the class
if io_impl := properties.get(PY_IO_IMPL):
if file_io := _import_file_io(io_impl, properties):
logger.info("Loaded FileIO: %s", io_impl)
return file_io
else:
raise ValueError(f"Could not initialize FileIO: {io_impl}")
# Check the table location
if location:
if file_io := _infer_file_io_from_scheme(location, properties):
return file_io
# Look at the schema of the warehouse
if warehouse_location := properties.get(WAREHOUSE):
if file_io := _infer_file_io_from_scheme(warehouse_location, properties):
return file_io
try:
# Default to PyArrow
logger.info("Defaulting to PyArrow FileIO")
from pyiceberg.io.pyarrow import PyArrowFileIO
return PyArrowFileIO(properties)
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
'Could not load a FileIO, please consider installing one: pip3 install "pyiceberg[pyarrow]", for more options refer to the docs.'
) from e
def _parse_location(location: str) -> Tuple[str, str, str]:
"""Return the path without the scheme."""
uri = urlparse(location)
if not uri.scheme:
return "file", uri.netloc, os.path.abspath(location)
elif uri.scheme in ("hdfs", "viewfs"):
return uri.scheme, uri.netloc, uri.path
else:
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"