pyiceberg/table/update/snapshot.py (592 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import concurrent.futures
import itertools
import uuid
from abc import abstractmethod
from collections import defaultdict
from concurrent.futures import Future
from functools import cached_property
from typing import TYPE_CHECKING, Callable, Dict, Generic, List, Optional, Set, Tuple
from sortedcontainers import SortedList
from pyiceberg.expressions import (
AlwaysFalse,
BooleanExpression,
Or,
)
from pyiceberg.expressions.visitors import (
ROWS_MIGHT_NOT_MATCH,
ROWS_MUST_MATCH,
_InclusiveMetricsEvaluator,
_StrictMetricsEvaluator,
inclusive_projection,
manifest_evaluator,
)
from pyiceberg.io import FileIO, OutputFile
from pyiceberg.manifest import (
DataFile,
DataFileContent,
ManifestContent,
ManifestEntry,
ManifestEntryStatus,
ManifestFile,
ManifestWriter,
write_manifest,
write_manifest_list,
)
from pyiceberg.partitioning import (
PartitionSpec,
)
from pyiceberg.table.snapshots import (
Operation,
Snapshot,
SnapshotSummaryCollector,
Summary,
update_snapshot_summaries,
)
from pyiceberg.table.update import (
AddSnapshotUpdate,
AssertRefSnapshotId,
RemoveSnapshotRefUpdate,
SetSnapshotRefUpdate,
TableRequirement,
TableUpdate,
U,
UpdatesAndRequirements,
UpdateTableMetadata,
)
from pyiceberg.typedef import (
EMPTY_DICT,
KeyDefaultDict,
)
from pyiceberg.utils.bin_packing import ListPacker
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.properties import property_as_bool, property_as_int
if TYPE_CHECKING:
from pyiceberg.table import Transaction
def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
return f"{commit_uuid}-m{num}.avro"
def _new_manifest_list_file_name(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
# Mimics the behavior in Java:
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
commit_uuid: uuid.UUID
_io: FileIO
_operation: Operation
_snapshot_id: int
_parent_snapshot_id: Optional[int]
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: Set[DataFile]
def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
super().__init__(transaction)
self.commit_uuid = commit_uuid or uuid.uuid4()
self._io = io
self._operation = operation
self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
# Since we only support the main branch for now
self._parent_snapshot_id = (
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
)
self._added_data_files = []
self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)
def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._added_data_files.append(data_file)
return self
def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._deleted_data_files.add(data_file)
return self
@abstractmethod
def _deleted_entries(self) -> List[ManifestEntry]: ...
@abstractmethod
def _existing_manifests(self) -> List[ManifestFile]: ...
def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
"""To perform any post-processing on the manifests before writing them to the new snapshot."""
return manifests
def _manifests(self) -> List[ManifestFile]:
def _write_added_manifest() -> List[ManifestFile]:
if self._added_data_files:
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for data_file in self._added_data_files:
writer.add(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self._snapshot_id,
sequence_number=None,
file_sequence_number=None,
data_file=data_file,
)
)
return [writer.to_manifest_file()]
else:
return []
def _write_delete_manifest() -> List[ManifestFile]:
# Check if we need to mark the files as deleted
deleted_entries = self._deleted_entries()
if len(deleted_entries) > 0:
deleted_manifests = []
partition_groups: Dict[int, List[ManifestEntry]] = defaultdict(list)
for deleted_entry in deleted_entries:
partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry)
for spec_id, entries in partition_groups.items():
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for entry in entries:
writer.add_entry(entry)
deleted_manifests.append(writer.to_manifest_file())
return deleted_manifests
else:
return []
executor = ExecutorFactory.get_or_create()
added_manifests = executor.submit(_write_added_manifest)
delete_manifests = executor.submit(_write_delete_manifest)
existing_manifests = executor.submit(self._existing_manifests)
return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
from pyiceberg.table import TableProperties
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)
for data_file in self._added_data_files:
ssc.add_file(
data_file=data_file,
partition_spec=self._transaction.table_metadata.spec(),
schema=self._transaction.table_metadata.schema(),
)
if len(self._deleted_data_files) > 0:
specs = self._transaction.table_metadata.specs()
for data_file in self._deleted_data_files:
ssc.remove_file(
data_file=data_file,
partition_spec=specs[data_file.spec_id],
schema=self._transaction.table_metadata.schema(),
)
previous_snapshot = (
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if self._parent_snapshot_id is not None
else None
)
return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
)
def _commit(self) -> UpdatesAndRequirements:
new_manifests = self._manifests()
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
summary = self._summary(self.snapshot_properties)
file_name = _new_manifest_list_file_name(
snapshot_id=self._snapshot_id,
attempt=0,
commit_uuid=self.commit_uuid,
)
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)
with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
sequence_number=next_sequence_number,
) as writer:
writer.add_manifests(new_manifests)
snapshot = Snapshot(
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
manifest_list=manifest_list_file_path,
sequence_number=next_sequence_number,
summary=summary,
schema_id=self._transaction.table_metadata.current_schema_id,
)
return (
(
AddSnapshotUpdate(snapshot=snapshot),
SetSnapshotRefUpdate(
snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
),
),
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
)
@property
def snapshot_id(self) -> int:
return self._snapshot_id
def spec(self, spec_id: int) -> PartitionSpec:
return self._transaction.table_metadata.specs()[spec_id]
def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
return write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=spec,
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
)
def new_manifest_output(self) -> OutputFile:
location_provider = self._transaction._table.location_provider()
file_name = _new_manifest_file_name(num=next(self._manifest_num_counter), commit_uuid=self.commit_uuid)
file_path = location_provider.new_metadata_location(file_name)
return self._io.new_output(file_path)
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)
class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
"""Will delete manifest entries from the current snapshot based on the predicate.
This will produce a DELETE snapshot:
Data files were removed and their contents logically deleted and/or delete
files were added to delete rows.
From the specification
"""
_predicate: BooleanExpression
_case_sensitive: bool
def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
):
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
self._predicate = AlwaysFalse()
self._case_sensitive = True
def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to delete
if self.files_affected:
return super()._commit()
else:
return (), ()
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
project = inclusive_projection(schema, spec, self._case_sensitive)
return project(self._predicate)
@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, schema, self.partition_filters[spec_id], self._case_sensitive)
def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None:
self._predicate = Or(self._predicate, predicate)
self._case_sensitive = case_sensitive
@cached_property
def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]:
"""Computes all the delete operation and cache it when nothing changes.
Returns:
- List of existing manifests that are not affected by the delete operation.
- The manifest-entries that are deleted based on the metadata.
- Flag indicating that rewrites of data-files are needed.
"""
schema = self._transaction.table_metadata.schema()
def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
return ManifestEntry.from_args(
status=status,
snapshot_id=entry.snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
schema, self._predicate, case_sensitive=self._case_sensitive
).eval
existing_manifests = []
total_deleted_entries = []
partial_rewrites_needed = False
self._deleted_data_files = set()
if snapshot := self._transaction.table_metadata.current_snapshot():
for manifest_file in snapshot.manifests(io=self._io):
if manifest_file.content == ManifestContent.DATA:
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
# If the manifest isn't relevant, we can just keep it in the manifest-list
existing_manifests.append(manifest_file)
else:
# It is relevant, let's check out the content
deleted_entries = []
existing_entries = []
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
# Based on the metadata, it can be dropped right away
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
self._deleted_data_files.add(entry.data_file)
else:
# Based on the metadata, we cannot determine if it can be deleted
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
partial_rewrites_needed = True
if len(deleted_entries) > 0:
total_deleted_entries += deleted_entries
# Rewrite the manifest
if len(existing_entries) > 0:
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
existing_manifests.append(writer.to_manifest_file())
else:
existing_manifests.append(manifest_file)
else:
existing_manifests.append(manifest_file)
return existing_manifests, total_deleted_entries, partial_rewrites_needed
def _existing_manifests(self) -> List[ManifestFile]:
return self._compute_deletes[0]
def _deleted_entries(self) -> List[ManifestEntry]:
return self._compute_deletes[1]
@property
def rewrites_needed(self) -> bool:
"""Indicate if data files need to be rewritten."""
return self._compute_deletes[2]
@property
def files_affected(self) -> bool:
"""Indicate if any manifest-entries can be dropped."""
return len(self._deleted_entries()) > 0
class _FastAppendFiles(_SnapshotProducer["_FastAppendFiles"]):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
A fast append will add another ManifestFile to the ManifestList.
All the existing manifest files are considered existing.
"""
existing_manifests = []
if self._parent_snapshot_id is not None:
previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if previous_snapshot is None:
raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}")
for manifest in previous_snapshot.manifests(io=self._io):
if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id:
existing_manifests.append(manifest)
return existing_manifests
def _deleted_entries(self) -> List[ManifestEntry]:
"""To determine if we need to record any deleted manifest entries.
In case of an append, nothing is deleted.
"""
return []
class _MergeAppendFiles(_FastAppendFiles):
_target_size_bytes: int
_min_count_to_merge: int
_merge_enabled: bool
def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
from pyiceberg.table import TableProperties
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
self._target_size_bytes = property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
) # type: ignore
self._min_count_to_merge = property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_MIN_MERGE_COUNT,
TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT,
) # type: ignore
self._merge_enabled = property_as_bool(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)
def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
"""To perform any post-processing on the manifests before writing them to the new snapshot.
In _MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
if automatic merge is enabled.
"""
unmerged_data_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DATA]
unmerged_deletes_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DELETES]
data_manifest_merge_manager = _ManifestMergeManager(
target_size_bytes=self._target_size_bytes,
min_count_to_merge=self._min_count_to_merge,
merge_enabled=self._merge_enabled,
snapshot_producer=self,
)
return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests
class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.
Data and delete files were added and removed in a logical overwrite operation.
"""
def _existing_manifests(self) -> List[ManifestFile]:
"""Determine if there are any existing manifest files."""
existing_files = []
if snapshot := self._transaction.table_metadata.current_snapshot():
for manifest_file in snapshot.manifests(io=self._io):
entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True)
found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files]
if len(found_deleted_data_files) == 0:
existing_files.append(manifest_file)
else:
# We have to rewrite the manifest file without the deleted data files
if any(entry.data_file not in found_deleted_data_files for entry in entries):
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
) as writer:
[
writer.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.EXISTING,
snapshot_id=entry.snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
)
for entry in entries
if entry.data_file not in found_deleted_data_files
]
existing_files.append(writer.to_manifest_file())
return existing_files
def _deleted_entries(self) -> List[ManifestEntry]:
"""To determine if we need to record any deleted entries.
With a full overwrite all the entries are considered deleted.
With partial overwrites we have to use the predicate to evaluate
which entries are affected.
"""
if self._parent_snapshot_id is not None:
previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if previous_snapshot is None:
# This should never happen since you cannot overwrite an empty table
raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}")
executor = ExecutorFactory.get_or_create()
def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.DELETED,
snapshot_id=entry.snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True)
if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files
]
list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io))
return list(itertools.chain(*list_of_entries))
else:
return []
class UpdateSnapshot:
_transaction: Transaction
_io: FileIO
_snapshot_properties: Dict[str, str]
def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
self._transaction = transaction
self._io = io
self._snapshot_properties = snapshot_properties
def fast_append(self) -> _FastAppendFiles:
return _FastAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)
def merge_append(self) -> _MergeAppendFiles:
return _MergeAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)
def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
return _OverwriteFiles(
commit_uuid=commit_uuid,
operation=Operation.OVERWRITE
if self._transaction.table_metadata.current_snapshot() is not None
else Operation.APPEND,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
)
def delete(self) -> _DeleteFiles:
return _DeleteFiles(
operation=Operation.DELETE,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
)
class _ManifestMergeManager(Generic[U]):
_target_size_bytes: int
_min_count_to_merge: int
_merge_enabled: bool
_snapshot_producer: _SnapshotProducer[U]
def __init__(
self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer[U]
) -> None:
self._target_size_bytes = target_size_bytes
self._min_count_to_merge = min_count_to_merge
self._merge_enabled = merge_enabled
self._snapshot_producer = snapshot_producer
def _group_by_spec(self, manifests: List[ManifestFile]) -> Dict[int, List[ManifestFile]]:
groups = defaultdict(list)
for manifest in manifests:
groups[manifest.partition_spec_id].append(manifest)
return groups
def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile:
with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer:
for manifest in manifest_bin:
for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False):
if entry.status == ManifestEntryStatus.DELETED and entry.snapshot_id == self._snapshot_producer.snapshot_id:
# only files deleted by this snapshot should be added to the new manifest
writer.delete(entry)
elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id:
# added entries from this snapshot are still added, otherwise they should be existing
writer.add(entry)
elif entry.status != ManifestEntryStatus.DELETED:
# add all non-deleted files from the old manifest as existing files
writer.existing(entry)
return writer.to_manifest_file()
def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]:
packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False)
bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length)
def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]:
output_manifests = []
if len(manifest_bin) == 1:
output_manifests.append(manifest_bin[0])
elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge:
# if the bin has the first manifest (the new data files or an appended manifest file) then only
# merge it if the number of manifests is above the minimum count. this is applied only to bins
# with an in-memory manifest so that large manifests don't prevent merging older groups.
output_manifests.extend(manifest_bin)
else:
output_manifests.append(self._create_manifest(spec_id, manifest_bin))
return output_manifests
executor = ExecutorFactory.get_or_create()
futures = [executor.submit(merge_bin, b) for b in bins]
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
bin_results: List[List[ManifestFile]] = [f.result() for f in completed_futures if f.result()]
return [manifest for bin_result in bin_results for manifest in bin_result]
def merge_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
if not self._merge_enabled or len(manifests) == 0:
return manifests
first_manifest = manifests[0]
groups = self._group_by_spec(manifests)
merged_manifests = []
for spec_id in reversed(groups.keys()):
merged_manifests.extend(self._merge_group(first_manifest, spec_id, groups[spec_id]))
return merged_manifests
class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
"""
Run snapshot management operations using APIs.
APIs include create branch, create tag, etc.
Use table.manage_snapshots().<operation>().commit() to run a specific operation.
Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Pending changes are applied on commit.
We can also use context managers to make more changes. For example,
with table.manage_snapshots() as ms:
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
"""
_updates: Tuple[TableUpdate, ...] = ()
_requirements: Tuple[TableRequirement, ...] = ()
def _commit(self) -> UpdatesAndRequirements:
"""Apply the pending changes and commit."""
return self._updates, self._requirements
def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots:
"""Remove a snapshot ref.
Args:
ref_name: branch / tag name to remove
Stages the updates and requirements for the remove-snapshot-ref.
Returns
This method for chaining
"""
updates = (RemoveSnapshotRefUpdate(ref_name=ref_name),)
requirements = (
AssertRefSnapshotId(
snapshot_id=self._transaction.table_metadata.refs[ref_name].snapshot_id
if ref_name in self._transaction.table_metadata.refs
else None,
ref=ref_name,
),
)
self._updates += updates
self._requirements += requirements
return self
def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots:
"""
Create a new tag pointing to the given snapshot id.
Args:
snapshot_id (int): snapshot id of the existing snapshot to tag
tag_name (str): name of the tag
max_ref_age_ms (Optional[int]): max ref age in milliseconds
Returns:
This for method chaining
"""
update, requirement = self._transaction._set_ref_snapshot(
snapshot_id=snapshot_id,
ref_name=tag_name,
type="tag",
max_ref_age_ms=max_ref_age_ms,
)
self._updates += update
self._requirements += requirement
return self
def remove_tag(self, tag_name: str) -> ManageSnapshots:
"""
Remove a tag.
Args:
tag_name (str): name of tag to remove
Returns:
This for method chaining
"""
return self._remove_ref_snapshot(ref_name=tag_name)
def create_branch(
self,
snapshot_id: int,
branch_name: str,
max_ref_age_ms: Optional[int] = None,
max_snapshot_age_ms: Optional[int] = None,
min_snapshots_to_keep: Optional[int] = None,
) -> ManageSnapshots:
"""
Create a new branch pointing to the given snapshot id.
Args:
snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
branch_name (str): name of the new branch
max_ref_age_ms (Optional[int]): max ref age in milliseconds
max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
min_snapshots_to_keep (Optional[int]): min number of snapshots to keep for the branch
Returns:
This for method chaining
"""
update, requirement = self._transaction._set_ref_snapshot(
snapshot_id=snapshot_id,
ref_name=branch_name,
type="branch",
max_ref_age_ms=max_ref_age_ms,
max_snapshot_age_ms=max_snapshot_age_ms,
min_snapshots_to_keep=min_snapshots_to_keep,
)
self._updates += update
self._requirements += requirement
return self
def remove_branch(self, branch_name: str) -> ManageSnapshots:
"""
Remove a branch.
Args:
branch_name (str): name of branch to remove
Returns:
This for method chaining
"""
return self._remove_ref_snapshot(ref_name=branch_name)