fn generic_set_lists()

in datafusion/functions-nested/src/set_ops.rs [350:429]


fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
    l: &GenericListArray<OffsetSize>,
    r: &GenericListArray<OffsetSize>,
    field: Arc<Field>,
    set_op: SetOp,
) -> Result<ArrayRef> {
    if l.is_empty() || l.value_type().is_null() {
        let field = Arc::new(Field::new_list_field(r.value_type(), true));
        return general_array_distinct::<OffsetSize>(r, &field);
    } else if r.is_empty() || r.value_type().is_null() {
        let field = Arc::new(Field::new_list_field(l.value_type(), true));
        return general_array_distinct::<OffsetSize>(l, &field);
    }

    if l.value_type() != r.value_type() {
        return internal_err!("{set_op:?} is not implemented for '{l:?}' and '{r:?}'");
    }

    let mut offsets = vec![OffsetSize::usize_as(0)];
    let mut new_arrays = vec![];
    let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
    for (first_arr, second_arr) in l.iter().zip(r.iter()) {
        let l_values = if let Some(first_arr) = first_arr {
            converter.convert_columns(&[first_arr])?
        } else {
            converter.convert_columns(&[])?
        };

        let r_values = if let Some(second_arr) = second_arr {
            converter.convert_columns(&[second_arr])?
        } else {
            converter.convert_columns(&[])?
        };

        let l_iter = l_values.iter().sorted().dedup();
        let values_set: HashSet<_> = l_iter.clone().collect();
        let mut rows = if set_op == SetOp::Union {
            l_iter.collect()
        } else {
            vec![]
        };

        for r_val in r_values.iter().sorted().dedup() {
            match set_op {
                SetOp::Union => {
                    if !values_set.contains(&r_val) {
                        rows.push(r_val);
                    }
                }
                SetOp::Intersect => {
                    if values_set.contains(&r_val) {
                        rows.push(r_val);
                    }
                }
            }
        }

        let last_offset = match offsets.last() {
            Some(offset) => *offset,
            None => return internal_err!("offsets should not be empty"),
        };

        offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
        let arrays = converter.convert_rows(rows)?;
        let array = match arrays.first() {
            Some(array) => Arc::clone(array),
            None => {
                return internal_err!("{set_op}: failed to get array from rows");
            }
        };

        new_arrays.push(array);
    }

    let offsets = OffsetBuffer::new(offsets.into());
    let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect();
    let values = compute::concat(&new_arrays_ref)?;
    let arr = GenericListArray::<OffsetSize>::try_new(field, offsets, values, None)?;
    Ok(Arc::new(arr))
}