in velox/functions/prestosql/Map.cpp [25:159]
void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx* context,
VectorPtr* result) const override {
VELOX_CHECK_EQ(args.size(), 2);
auto keys = args[0];
auto values = args[1];
exec::DecodedArgs decodedArgs(rows, args, context);
auto decodedKeys = decodedArgs.at(0);
auto decodedValues = decodedArgs.at(1);
static const char* kArrayLengthsMismatch =
"Key and value arrays must be the same length";
static const char* kDuplicateKey = "Duplicate map keys are not allowed";
MapVectorPtr mapVector;
if (decodedKeys->isIdentityMapping() &&
decodedValues->isIdentityMapping()) {
auto keysArray = keys->as<ArrayVector>();
auto valuesArray = values->as<ArrayVector>();
// Check array lengths
rows.applyToSelected([&](vector_size_t row) {
VELOX_USER_CHECK_EQ(
keysArray->sizeAt(row),
valuesArray->sizeAt(row),
"{}",
kArrayLengthsMismatch);
});
mapVector = std::make_shared<MapVector>(
context->pool(),
outputType,
BufferPtr(nullptr),
rows.size(),
keysArray->offsets(),
keysArray->sizes(),
keysArray->elements(),
valuesArray->elements());
} else {
auto keyIndices = decodedKeys->indices();
auto valueIndices = decodedValues->indices();
auto keysArray = decodedKeys->base()->as<ArrayVector>();
auto valuesArray = decodedValues->base()->as<ArrayVector>();
// Check array lengths
rows.applyToSelected([&](vector_size_t row) {
VELOX_USER_CHECK_EQ(
keysArray->sizeAt(keyIndices[row]),
valuesArray->sizeAt(valueIndices[row]),
"{}",
kArrayLengthsMismatch);
});
vector_size_t totalElements = 0;
rows.applyToSelected([&](auto row) {
totalElements += keysArray->sizeAt(keyIndices[row]);
});
BufferPtr offsets = allocateOffsets(rows.size(), context->pool());
auto rawOffsets = offsets->asMutable<vector_size_t>();
BufferPtr sizes = allocateSizes(rows.size(), context->pool());
auto rawSizes = sizes->asMutable<vector_size_t>();
BufferPtr valuesIndices = allocateIndices(totalElements, context->pool());
auto rawValuesIndices = valuesIndices->asMutable<vector_size_t>();
BufferPtr keysIndices = allocateIndices(totalElements, context->pool());
auto rawKeysIndices = keysIndices->asMutable<vector_size_t>();
vector_size_t offset = 0;
rows.applyToSelected([&](vector_size_t row) {
auto size = keysArray->sizeAt(keyIndices[row]);
rawOffsets[row] = offset;
rawSizes[row] = size;
auto keysOffset = keysArray->offsetAt(keyIndices[row]);
auto valuesOffset = valuesArray->offsetAt(valueIndices[row]);
for (vector_size_t i = 0; i < size; i++) {
rawKeysIndices[offset + i] = keysOffset + i;
rawValuesIndices[offset + i] = valuesOffset + i;
}
offset += size;
});
auto wrappedKeys = BaseVector::wrapInDictionary(
BufferPtr(nullptr),
keysIndices,
totalElements,
keysArray->elements());
auto wrappedValues = BaseVector::wrapInDictionary(
BufferPtr(nullptr),
valuesIndices,
totalElements,
valuesArray->elements());
mapVector = std::make_shared<MapVector>(
context->pool(),
outputType,
BufferPtr(nullptr),
rows.size(),
offsets,
sizes,
wrappedKeys,
wrappedValues);
}
if constexpr (!AllowDuplicateKeys) {
// Check for duplicate keys
MapVector::canonicalize(mapVector);
auto offsets = mapVector->rawOffsets();
auto sizes = mapVector->rawSizes();
auto mapKeys = mapVector->mapKeys();
rows.applyToSelected([&](vector_size_t row) {
auto offset = offsets[row];
auto size = sizes[row];
for (vector_size_t i = 1; i < size; i++) {
if (mapKeys->equalValueAt(
mapKeys.get(), offset + i, offset + i - 1)) {
VELOX_USER_FAIL("{}", kDuplicateKey);
}
}
});
}
context->moveOrCopyResult(mapVector, rows, result);
}