def _determine_partitions()

in pyiceberg/io/pyarrow.py [0:0]


def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[_TablePartition]:
    """Based on the iceberg table partition spec, filter the arrow table into partitions with their keys.

    Example:
    Input:
    An arrow table with partition key of ['n_legs', 'year'] and with data of
    {'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021],
     'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100],
     'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}.
    The algorithm:
    - We determine the set of unique partition keys
    - Then we produce a set of partitions by filtering on each of the combinations
    - We combine the chunks to create a copy to avoid GIL congestion on the original table
    """
    # Assign unique names to columns where the partition transform has been applied
    # to avoid conflicts
    partition_fields = [f"_partition_{field.name}" for field in spec.fields]

    for partition, name in zip(spec.fields, partition_fields):
        source_field = schema.find_field(partition.source_id)
        arrow_table = arrow_table.append_column(
            name, partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name])
        )

    unique_partition_fields = arrow_table.select(partition_fields).group_by(partition_fields).aggregate([])

    table_partitions = []
    # TODO: As a next step, we could also play around with yielding instead of materializing the full list
    for unique_partition in unique_partition_fields.to_pylist():
        partition_key = PartitionKey(
            field_values=[
                PartitionFieldValue(field=field, value=unique_partition[name])
                for field, name in zip(spec.fields, partition_fields)
            ],
            partition_spec=spec,
            schema=schema,
        )
        filtered_table = arrow_table.filter(
            functools.reduce(
                operator.and_,
                [
                    pc.field(partition_field_name) == unique_partition[partition_field_name]
                    if unique_partition[partition_field_name] is not None
                    else pc.field(partition_field_name).is_null()
                    for field, partition_field_name in zip(spec.fields, partition_fields)
                ],
            )
        )
        filtered_table = filtered_table.drop_columns(partition_fields)

        # The combine_chunks seems to be counter-intuitive to do, but it actually returns
        # fresh buffers that don't interfere with each other when it is written out to file
        table_partitions.append(
            _TablePartition(partition_key=partition_key, arrow_table_partition=filtered_table.combine_chunks())
        )

    return table_partitions