def get_rows_to_update()

in pyiceberg/table/upsert_util.py [0:0]


def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols: list[str]) -> pa.Table:
    """
    Return a table with rows that need to be updated in the target table based on the join columns.

    When a row is matched, an additional scan is done to evaluate the non-key columns to detect if an actual change has occurred.
    Only matched rows that have an actual change to a non-key column value will be returned in the final output.
    """
    all_columns = set(source_table.column_names)
    join_cols_set = set(join_cols)

    non_key_cols = list(all_columns - join_cols_set)

    match_expr = functools.reduce(operator.and_, [pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols])

    matching_source_rows = source_table.filter(match_expr)

    rows_to_update = []

    for index in range(matching_source_rows.num_rows):
        source_row = matching_source_rows.slice(index, 1)

        target_filter = functools.reduce(operator.and_, [pc.field(col) == source_row.column(col)[0].as_py() for col in join_cols])

        matching_target_row = target_table.filter(target_filter)

        if matching_target_row.num_rows > 0:
            needs_update = False

            for non_key_col in non_key_cols:
                source_value = source_row.column(non_key_col)[0].as_py()
                target_value = matching_target_row.column(non_key_col)[0].as_py()

                if source_value != target_value:
                    needs_update = True
                    break

            if needs_update:
                rows_to_update.append(source_row)

    if rows_to_update:
        rows_to_update_table = pa.concat_tables(rows_to_update)
    else:
        rows_to_update_table = pa.Table.from_arrays([], names=source_table.column_names)

    common_columns = set(source_table.column_names).intersection(set(target_table.column_names))
    rows_to_update_table = rows_to_update_table.select(list(common_columns))

    return rows_to_update_table