in pyiceberg/table/__init__.py [0:0]
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand for overwriting existing partitions with a PyArrow table.
The function detects partition values in the provided arrow table using the current
partition spec, and deletes existing partitions matching these values. Finally, the
data in the table is appended to the table.
Args:
df: The Arrow dataframe that will be used to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
import pyarrow as pa
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files
if not isinstance(df, pa.Table):
raise ValueError(f"Expected PyArrow table, got: {df}")
if self.table_metadata.spec().is_unpartitioned():
raise ValueError("Cannot apply dynamic overwrite on an unpartitioned table.")
for field in self.table_metadata.spec().fields:
if not isinstance(field.transform, IdentityTransform):
raise ValueError(
f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: {field}"
)
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)
# If dataframe does not have data, there is no need to overwrite
if df.shape[0] == 0:
return
append_snapshot_commit_uuid = uuid.uuid4()
data_files: List[DataFile] = list(
_dataframe_to_data_files(
table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
)
)
partitions_to_overwrite = {data_file.partition for data_file in data_files}
delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
with self._append_snapshot_producer(snapshot_properties) as append_files:
append_files.commit_uuid = append_snapshot_commit_uuid
for data_file in data_files:
append_files.append_data_file(data_file)