in pyiceberg/table/update/schema.py [0:0]
def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name: str) -> UpdateSchema:
"""Update the name of a column.
Args:
path_from: The path to the column to be renamed.
new_name: The new path of the column.
Returns:
The UpdateSchema with the rename operation staged.
"""
path_from = ".".join(path_from) if isinstance(path_from, tuple) else path_from
field_from = self._schema.find_field(path_from, self._case_sensitive)
if field_from.field_id in self._deletes:
raise ValueError(f"Cannot rename a column that will be deleted: {path_from}")
if updated := self._updates.get(field_from.field_id):
self._updates[field_from.field_id] = NestedField(
field_id=updated.field_id,
name=new_name,
field_type=updated.field_type,
doc=updated.doc,
required=updated.required,
initial_default=updated.initial_default,
write_default=updated.write_default,
)
else:
self._updates[field_from.field_id] = NestedField(
field_id=field_from.field_id,
name=new_name,
field_type=field_from.field_type,
doc=field_from.doc,
required=field_from.required,
initial_default=field_from.initial_default,
write_default=field_from.write_default,
)
# Lookup the field because of casing
from_field_correct_casing = self._schema.find_column_name(field_from.field_id)
if from_field_correct_casing in self._identifier_field_names:
self._identifier_field_names.remove(from_field_correct_casing)
new_identifier_path = f"{from_field_correct_casing[: -len(field_from.name)]}{new_name}"
self._identifier_field_names.add(new_identifier_path)
return self