in velox/functions/prestosql/Zip.cpp [63:181]
void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx* context,
VectorPtr* result) const override {
validateInputTypes(args);
const vector_size_t numInputArrays = args.size();
exec::DecodedArgs decodedArgs(rows, args, context);
std::vector<const ArrayVector*> baseVectors(numInputArrays);
std::vector<const vector_size_t*> rawSizes(numInputArrays);
std::vector<const vector_size_t*> rawOffsets(numInputArrays);
std::vector<const vector_size_t*> indices(numInputArrays);
for (int i = 0; i < numInputArrays; i++) {
baseVectors[i] = decodedArgs.at(i)->base()->as<ArrayVector>();
rawSizes[i] = baseVectors[i]->rawSizes();
rawOffsets[i] = baseVectors[i]->rawOffsets();
indices[i] = decodedArgs.at(i)->indices();
}
// Size of elements in result vector.
vector_size_t resultElementsSize = 0;
auto pool = context->pool();
// Determine what the size of the resultant elements will be so we can
// reserve enough space.
auto getMaxArraySize = [&](vector_size_t row) -> vector_size_t {
vector_size_t maxSize = 0;
for (int i = 0; i < numInputArrays; i++) {
maxSize = std::max(maxSize, rawSizes[i][indices[i][row]]);
}
return maxSize;
};
BufferPtr resultArraySizesBuffer = allocateSizes(rows.end(), pool);
auto rawResultArraySizes =
resultArraySizesBuffer->asMutable<vector_size_t>();
rows.applyToSelected([&](auto row) {
auto maxSize = getMaxArraySize(row);
resultElementsSize += maxSize;
rawResultArraySizes[row] = maxSize;
});
// Create individual result vectors for each input Array vector.
std::vector<BufferPtr> nestedResultIndices(numInputArrays);
std::vector<BufferPtr> nestedResultNulls(numInputArrays);
std::vector<vector_size_t*> rawNestedResultIndices(numInputArrays);
std::vector<uint64_t*> rawNestedResultNulls(numInputArrays);
for (int i = 0; i < numInputArrays; i++) {
nestedResultIndices[i] = allocateIndices(resultElementsSize, pool);
nestedResultNulls[i] = AlignedBuffer::allocate<bool>(
resultElementsSize, pool, bits::kNotNull);
rawNestedResultIndices[i] =
nestedResultIndices[i]->asMutable<vector_size_t>();
rawNestedResultNulls[i] = nestedResultNulls[i]->asMutable<uint64_t>();
}
const auto resultArraySize = rows.end();
BufferPtr resultArrayOffsets = allocateOffsets(resultArraySize, pool);
auto rawResultArrayOffsets = resultArrayOffsets->asMutable<vector_size_t>();
// Create right offsets/indexes for the individual and final result arrays.
int elementRow = 0;
rows.applyToSelected([&](auto row) {
// Get the max size for that row.
auto maxArraySize = rawResultArraySizes[row];
rawResultArrayOffsets[row] = elementRow;
for (int i = 0; i < numInputArrays; i++) {
auto offset = rawOffsets[i][indices[i][row]];
auto size = rawSizes[i][indices[i][row]];
std::iota(
rawNestedResultIndices[i] + elementRow,
rawNestedResultIndices[i] + elementRow + size,
offset);
bits::fillBits(
rawNestedResultNulls[i],
elementRow + size,
elementRow + maxArraySize,
bits::kNull);
}
elementRow += maxArraySize;
});
// Create result dictionary vectors.
std::vector<VectorPtr> resultDictionaryVectors(numInputArrays);
for (int i = 0; i < numInputArrays; i++) {
resultDictionaryVectors[i] = BaseVector::wrapInDictionary(
nestedResultNulls[i],
nestedResultIndices[i],
resultElementsSize,
baseVectors[i]->elements());
}
auto rowType = outputType->childAt(0);
auto rowVector = std::make_shared<RowVector>(
pool,
rowType,
BufferPtr(nullptr),
resultElementsSize,
resultDictionaryVectors);
// Now convert these to an Array
auto arrayVector = std::make_shared<ArrayVector>(
pool,
outputType,
BufferPtr(nullptr),
rows.end(),
resultArrayOffsets,
resultArraySizesBuffer,
std::move(rowVector));
context->moveOrCopyResult(arrayVector, rows, result);
}