in datafusion/functions-nested/src/set_ops.rs [350:429]
fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
field: Arc<Field>,
set_op: SetOp,
) -> Result<ArrayRef> {
if l.is_empty() || l.value_type().is_null() {
let field = Arc::new(Field::new_list_field(r.value_type(), true));
return general_array_distinct::<OffsetSize>(r, &field);
} else if r.is_empty() || r.value_type().is_null() {
let field = Arc::new(Field::new_list_field(l.value_type(), true));
return general_array_distinct::<OffsetSize>(l, &field);
}
if l.value_type() != r.value_type() {
return internal_err!("{set_op:?} is not implemented for '{l:?}' and '{r:?}'");
}
let mut offsets = vec![OffsetSize::usize_as(0)];
let mut new_arrays = vec![];
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
for (first_arr, second_arr) in l.iter().zip(r.iter()) {
let l_values = if let Some(first_arr) = first_arr {
converter.convert_columns(&[first_arr])?
} else {
converter.convert_columns(&[])?
};
let r_values = if let Some(second_arr) = second_arr {
converter.convert_columns(&[second_arr])?
} else {
converter.convert_columns(&[])?
};
let l_iter = l_values.iter().sorted().dedup();
let values_set: HashSet<_> = l_iter.clone().collect();
let mut rows = if set_op == SetOp::Union {
l_iter.collect()
} else {
vec![]
};
for r_val in r_values.iter().sorted().dedup() {
match set_op {
SetOp::Union => {
if !values_set.contains(&r_val) {
rows.push(r_val);
}
}
SetOp::Intersect => {
if values_set.contains(&r_val) {
rows.push(r_val);
}
}
}
}
let last_offset = match offsets.last() {
Some(offset) => *offset,
None => return internal_err!("offsets should not be empty"),
};
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
let array = match arrays.first() {
Some(array) => Arc::clone(array),
None => {
return internal_err!("{set_op}: failed to get array from rows");
}
};
new_arrays.push(array);
}
let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect();
let values = compute::concat(&new_arrays_ref)?;
let arr = GenericListArray::<OffsetSize>::try_new(field, offsets, values, None)?;
Ok(Arc::new(arr))
}