def _compute_deletes()

in pyiceberg/table/update/snapshot.py [0:0]


    def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]:
        """Computes all the delete operation and cache it when nothing changes.

        Returns:
            - List of existing manifests that are not affected by the delete operation.
            - The manifest-entries that are deleted based on the metadata.
            - Flag indicating that rewrites of data-files are needed.
        """
        schema = self._transaction.table_metadata.schema()

        def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
            return ManifestEntry.from_args(
                status=status,
                snapshot_id=entry.snapshot_id,
                sequence_number=entry.sequence_number,
                file_sequence_number=entry.file_sequence_number,
                data_file=entry.data_file,
            )

        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
        strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
        inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
            schema, self._predicate, case_sensitive=self._case_sensitive
        ).eval

        existing_manifests = []
        total_deleted_entries = []
        partial_rewrites_needed = False
        self._deleted_data_files = set()
        if snapshot := self._transaction.table_metadata.current_snapshot():
            for manifest_file in snapshot.manifests(io=self._io):
                if manifest_file.content == ManifestContent.DATA:
                    if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
                        # If the manifest isn't relevant, we can just keep it in the manifest-list
                        existing_manifests.append(manifest_file)
                    else:
                        # It is relevant, let's check out the content
                        deleted_entries = []
                        existing_entries = []
                        for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
                            if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
                                # Based on the metadata, it can be dropped right away
                                deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
                                self._deleted_data_files.add(entry.data_file)
                            else:
                                # Based on the metadata, we cannot determine if it can be deleted
                                existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
                                if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
                                    partial_rewrites_needed = True

                        if len(deleted_entries) > 0:
                            total_deleted_entries += deleted_entries

                            # Rewrite the manifest
                            if len(existing_entries) > 0:
                                with write_manifest(
                                    format_version=self._transaction.table_metadata.format_version,
                                    spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
                                    schema=self._transaction.table_metadata.schema(),
                                    output_file=self.new_manifest_output(),
                                    snapshot_id=self._snapshot_id,
                                ) as writer:
                                    for existing_entry in existing_entries:
                                        writer.add_entry(existing_entry)
                                existing_manifests.append(writer.to_manifest_file())
                        else:
                            existing_manifests.append(manifest_file)
                else:
                    existing_manifests.append(manifest_file)

        return existing_manifests, total_deleted_entries, partial_rewrites_needed