in pyiceberg/table/update/snapshot.py [0:0]
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
from pyiceberg.table import TableProperties
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)
for data_file in self._added_data_files:
ssc.add_file(
data_file=data_file,
partition_spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
)
if len(self._deleted_data_files) > 0:
specs = self._transaction.table_metadata.specs()
for data_file in self._deleted_data_files:
ssc.remove_file(
data_file=data_file,
partition_spec=specs[data_file.spec_id],
schema=self._transaction.table_metadata.schema(),
)
previous_snapshot = (
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if self._parent_snapshot_id is not None
else None
)
return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
)