in velox/functions/prestosql/ArrayIntersectExcept.cpp [122:261]
void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& /* outputType */,
exec::EvalCtx* context,
VectorPtr* result) const override {
memory::MemoryPool* pool = context->pool();
BaseVector* left = args[0].get();
BaseVector* right = args[1].get();
// For array_intersect, if there's a constant input, then require it is on
// the right side. For array_except, the constant optimization only applies
// if the constant is on the rhs, so this swap is not applicable.
if constexpr (isIntersect) {
if (constantSet_.has_value() && isLeftConstant_) {
std::swap(left, right);
}
}
exec::LocalDecodedVector leftHolder(context, *left, rows);
auto decodedLeftArray = leftHolder.get();
auto baseLeftArray = decodedLeftArray->base()->as<ArrayVector>();
// Decode and acquire array elements vector.
exec::LocalDecodedVector leftElementsDecoder(context);
auto decodedLeftElements =
decodeArrayElements(leftHolder, leftElementsDecoder, rows);
auto leftElementsCount =
countElements<ArrayVector>(rows, *decodedLeftArray);
vector_size_t rowCount = left->size();
// Allocate new vectors for indices, nulls, length and offsets.
BufferPtr newIndices = allocateIndices(leftElementsCount, pool);
BufferPtr newElementNulls =
AlignedBuffer::allocate<bool>(leftElementsCount, pool, bits::kNotNull);
BufferPtr newLengths = allocateSizes(rowCount, pool);
BufferPtr newOffsets = allocateOffsets(rowCount, pool);
// Pointers and cursors to the raw data.
auto rawNewIndices = newIndices->asMutable<vector_size_t>();
auto rawNewElementNulls = newElementNulls->asMutable<uint64_t>();
auto rawNewLengths = newLengths->asMutable<vector_size_t>();
auto rawNewOffsets = newOffsets->asMutable<vector_size_t>();
vector_size_t indicesCursor = 0;
// Lambda that process each row. This is detached from the code so we can
// apply it differently based on whether the right-hand side set is constant
// or not.
auto processRow = [&](vector_size_t row,
const SetWithNull<T>& rightSet,
SetWithNull<T>& outputSet) {
auto idx = decodedLeftArray->index(row);
auto size = baseLeftArray->sizeAt(idx);
auto offset = baseLeftArray->offsetAt(idx);
outputSet.reset();
*rawNewOffsets = indicesCursor;
// Scans the array elements on the left-hand side.
for (vector_size_t i = offset; i < (offset + size); ++i) {
if (decodedLeftElements->isNullAt(i)) {
// For a NULL value not added to the output row yet, insert in
// array_intersect if it was found on the rhs (and not found in the
// case of array_except).
if (!outputSet.hasNull) {
bool setNull = false;
if constexpr (isIntersect) {
setNull = rightSet.hasNull;
} else {
setNull = !rightSet.hasNull;
}
if (setNull) {
bits::setNull(rawNewElementNulls, indicesCursor++, true);
outputSet.hasNull = true;
}
}
} else {
auto val = decodedLeftElements->valueAt<T>(i);
// For array_intersect, add the element if it is found (not found
// for array_except) in the right-hand side, and wasn't added already
// (check outputSet).
bool addValue = false;
if constexpr (isIntersect) {
addValue = rightSet.set.count(val) > 0;
} else {
addValue = rightSet.set.count(val) == 0;
}
if (addValue) {
auto it = outputSet.set.insert(val);
if (it.second) {
rawNewIndices[indicesCursor++] = i;
}
}
}
}
*rawNewLengths = indicesCursor - *rawNewOffsets;
++rawNewLengths;
++rawNewOffsets;
};
SetWithNull<T> outputSet;
// Optimized case when the right-hand side array is constant.
if (constantSet_.has_value()) {
rows.applyToSelected([&](vector_size_t row) {
processRow(row, *constantSet_, outputSet);
});
}
// General case when no arrays are constant and both sets need to be
// computed for each row.
else {
exec::LocalDecodedVector rightHolder(context, *right, rows);
// Decode and acquire array elements vector.
exec::LocalDecodedVector rightElementsHolder(context);
auto decodedRightElements =
decodeArrayElements(rightHolder, rightElementsHolder, rows);
SetWithNull<T> rightSet;
auto rightArrayVector = rightHolder.get()->base()->as<ArrayVector>();
rows.applyToSelected([&](vector_size_t row) {
auto idx = rightHolder.get()->index(row);
generateSet<T>(rightArrayVector, decodedRightElements, idx, rightSet);
processRow(row, rightSet, outputSet);
});
}
auto newElements = BaseVector::wrapInDictionary(
newElementNulls, newIndices, indicesCursor, baseLeftArray->elements());
auto resultArray = std::make_shared<ArrayVector>(
pool,
ARRAY(CppToType<T>::create()),
BufferPtr(nullptr),
rowCount,
newOffsets,
newLengths,
newElements,
0);
context->moveOrCopyResult(resultArray, rows, result);
}