def dynamic_partition_overwrite()

in pyiceberg/table/__init__.py [0:0]


    def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand for overwriting existing partitions with a PyArrow table.

        The function detects partition values in the provided arrow table using the current
        partition spec, and deletes existing partitions matching these values. Finally, the
        data in the table is appended to the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if self.table_metadata.spec().is_unpartitioned():
            raise ValueError("Cannot apply dynamic overwrite on an unpartitioned table.")

        for field in self.table_metadata.spec().fields:
            if not isinstance(field.transform, IdentityTransform):
                raise ValueError(
                    f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: {field}"
                )

        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )

        # If dataframe does not have data, there is no need to overwrite
        if df.shape[0] == 0:
            return

        append_snapshot_commit_uuid = uuid.uuid4()
        data_files: List[DataFile] = list(
            _dataframe_to_data_files(
                table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
            )
        )

        partitions_to_overwrite = {data_file.partition for data_file in data_files}
        delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
        self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)

        with self._append_snapshot_producer(snapshot_properties) as append_files:
            append_files.commit_uuid = append_snapshot_commit_uuid
            for data_file in data_files:
                append_files.append_data_file(data_file)