in pyiceberg/table/update/spec.py [0:0]
def _apply(self) -> PartitionSpec:
def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, partition_names: Set[str]) -> None:
try:
field = schema.find_field(name)
except ValueError:
field = None
if source_id is not None and field is not None and field.field_id != source_id:
raise ValueError(f"Cannot create identity partition from a different field in the schema {name}")
elif field is not None and source_id != field.field_id:
raise ValueError(f"Cannot create partition from name that exists in schema {name}")
if not name:
raise ValueError("Undefined name")
if name in partition_names:
raise ValueError(f"Partition name has to be unique: {name}")
partition_names.add(name)
def _add_new_field(
schema: Schema, source_id: int, field_id: int, name: str, transform: Transform[Any, Any], partition_names: Set[str]
) -> PartitionField:
_check_and_add_partition_name(schema, name, source_id, partition_names)
return PartitionField(source_id, field_id, transform, name)
partition_fields = []
partition_names: Set[str] = set()
for field in self._transaction.table_metadata.spec().fields:
if field.field_id not in self._deletes:
renamed = self._renames.get(field.name)
if renamed:
new_field = _add_new_field(
self._transaction.table_metadata.schema(),
field.source_id,
field.field_id,
renamed,
field.transform,
partition_names,
)
else:
new_field = _add_new_field(
self._transaction.table_metadata.schema(),
field.source_id,
field.field_id,
field.name,
field.transform,
partition_names,
)
partition_fields.append(new_field)
elif self._transaction.table_metadata.format_version == 1:
renamed = self._renames.get(field.name)
if renamed:
new_field = _add_new_field(
self._transaction.table_metadata.schema(),
field.source_id,
field.field_id,
renamed,
VoidTransform(),
partition_names,
)
else:
new_field = _add_new_field(
self._transaction.table_metadata.schema(),
field.source_id,
field.field_id,
field.name,
VoidTransform(),
partition_names,
)
partition_fields.append(new_field)
for added_field in self._adds:
new_field = PartitionField(
source_id=added_field.source_id,
field_id=added_field.field_id,
transform=added_field.transform,
name=added_field.name,
)
partition_fields.append(new_field)
# Reuse spec id or create a new one.
new_spec = PartitionSpec(*partition_fields)
new_spec_id = INITIAL_PARTITION_SPEC_ID
for spec in self._transaction.table_metadata.specs().values():
if new_spec.compatible_with(spec):
new_spec_id = spec.spec_id
break
elif new_spec_id <= spec.spec_id:
new_spec_id = spec.spec_id + 1
return PartitionSpec(*partition_fields, spec_id=new_spec_id)