in pyiceberg/table/update/snapshot.py [0:0]
def _manifests(self) -> List[ManifestFile]:
def _write_added_manifest() -> List[ManifestFile]:
if self._added_data_files:
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for data_file in self._added_data_files:
writer.add(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self._snapshot_id,
sequence_number=None,
file_sequence_number=None,
data_file=data_file,
)
)
return [writer.to_manifest_file()]
else:
return []
def _write_delete_manifest() -> List[ManifestFile]:
# Check if we need to mark the files as deleted
deleted_entries = self._deleted_entries()
if len(deleted_entries) > 0:
deleted_manifests = []
partition_groups: Dict[int, List[ManifestEntry]] = defaultdict(list)
for deleted_entry in deleted_entries:
partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry)
for spec_id, entries in partition_groups.items():
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for entry in entries:
writer.add_entry(entry)
deleted_manifests.append(writer.to_manifest_file())
return deleted_manifests
else:
return []
executor = ExecutorFactory.get_or_create()
added_manifests = executor.submit(_write_added_manifest)
delete_manifests = executor.submit(_write_delete_manifest)
existing_manifests = executor.submit(self._existing_manifests)
return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())