in pyiceberg/table/update/snapshot.py [0:0]
def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]:
packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False)
bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length)
def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]:
output_manifests = []
if len(manifest_bin) == 1:
output_manifests.append(manifest_bin[0])
elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge:
# if the bin has the first manifest (the new data files or an appended manifest file) then only
# merge it if the number of manifests is above the minimum count. this is applied only to bins
# with an in-memory manifest so that large manifests don't prevent merging older groups.
output_manifests.extend(manifest_bin)
else:
output_manifests.append(self._create_manifest(spec_id, manifest_bin))
return output_manifests
executor = ExecutorFactory.get_or_create()
futures = [executor.submit(merge_bin, b) for b in bins]
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
bin_results: List[List[ManifestFile]] = [f.result() for f in completed_futures if f.result()]
return [manifest for bin_result in bin_results for manifest in bin_result]