def _partition_value()

in pyiceberg/io/pyarrow.py [0:0]


    def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any:
        if partition_field.source_id not in self.column_aggregates:
            return None

        source_field = schema.find_field(partition_field.source_id)
        iceberg_transform = partition_field.transform

        if not iceberg_transform.preserves_order:
            raise ValueError(
                f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}"
            )

        transform_func = iceberg_transform.transform(source_field.field_type)

        lower_value = transform_func(
            partition_record_value(
                partition_field=partition_field,
                value=self.column_aggregates[partition_field.source_id].current_min,
                schema=schema,
            )
        )
        upper_value = transform_func(
            partition_record_value(
                partition_field=partition_field,
                value=self.column_aggregates[partition_field.source_id].current_max,
                schema=schema,
            )
        )
        if lower_value != upper_value:
            raise ValueError(
                f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
            )

        return lower_value