in pyiceberg/table/update/snapshot.py [0:0]
def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]:
"""Computes all the delete operation and cache it when nothing changes.
Returns:
- List of existing manifests that are not affected by the delete operation.
- The manifest-entries that are deleted based on the metadata.
- Flag indicating that rewrites of data-files are needed.
"""
schema = self._transaction.table_metadata.schema()
def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
return ManifestEntry.from_args(
status=status,
snapshot_id=entry.snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
schema, self._predicate, case_sensitive=self._case_sensitive
).eval
existing_manifests = []
total_deleted_entries = []
partial_rewrites_needed = False
self._deleted_data_files = set()
if snapshot := self._transaction.table_metadata.current_snapshot():
for manifest_file in snapshot.manifests(io=self._io):
if manifest_file.content == ManifestContent.DATA:
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
# If the manifest isn't relevant, we can just keep it in the manifest-list
existing_manifests.append(manifest_file)
else:
# It is relevant, let's check out the content
deleted_entries = []
existing_entries = []
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
# Based on the metadata, it can be dropped right away
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
self._deleted_data_files.add(entry.data_file)
else:
# Based on the metadata, we cannot determine if it can be deleted
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
partial_rewrites_needed = True
if len(deleted_entries) > 0:
total_deleted_entries += deleted_entries
# Rewrite the manifest
if len(existing_entries) > 0:
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
existing_manifests.append(writer.to_manifest_file())
else:
existing_manifests.append(manifest_file)
else:
existing_manifests.append(manifest_file)
return existing_manifests, total_deleted_entries, partial_rewrites_needed