in velox/dwio/dwrf/reader/FlatMapColumnReader.cpp [320:466]
void FlatMapColumnReader<T>::next(
uint64_t numValues,
VectorPtr& result,
const uint64_t* incomingNulls) {
auto mapVector = detail::resetIfWrongVectorType<MapVector>(result);
VectorPtr keysVector;
VectorPtr valuesVector;
BufferPtr offsets;
BufferPtr lengths;
if (mapVector) {
keysVector = mapVector->mapKeys();
if (returnFlatVector_) {
valuesVector = mapVector->mapValues();
}
offsets = mapVector->mutableOffsets(numValues);
lengths = mapVector->mutableSizes(numValues);
}
BufferPtr nulls = readNulls(numValues, result, incomingNulls);
const auto* nullsPtr = nulls ? nulls->as<uint64_t>() : nullptr;
uint64_t nullCount = nullsPtr ? bits::countNulls(nullsPtr, 0, numValues) : 0;
if (mapVector) {
detail::resetIfNotWritable(result, offsets, lengths);
}
if (!offsets) {
offsets = AlignedBuffer::allocate<vector_size_t>(numValues, &memoryPool_);
}
if (!lengths) {
lengths = AlignedBuffer::allocate<vector_size_t>(numValues, &memoryPool_);
}
auto nonNullMaps = numValues - nullCount;
// opt - only loop over nodes that have value
std::vector<KeyNode<T>*> nodes;
utils::BulkBitIterator<char> bulkInMapIter{};
std::vector<const BaseVector*> nodeBatches;
size_t totalChildren = 0;
if (nonNullMaps > 0) {
for (auto& node : keyNodes_) {
// if the node has value filled into key-value batch
// future optimization - enable batch to be sortable on row index
// and below next can be updated to next(keys, values, numValues)
// which writes row index into batch and offsets can be generated
auto batch = node->load(nonNullMaps);
if (batch) {
nodes.emplace_back(node.get());
node->addToBulkInMapBitIterator(bulkInMapIter);
nodeBatches.push_back(batch);
totalChildren += batch->size();
}
}
}
size_t startIndices[nodeBatches.size()];
size_t nodeIndices[nodeBatches.size()];
auto& mapValueType = requestedType_->type->asMap().valueType();
if (totalChildren > 0) {
size_t childOffset = 0;
for (size_t i = 0; i < nodeBatches.size(); i++) {
nodeIndices[i] = 0;
startIndices[i] = childOffset;
childOffset += nodeBatches[i]->size();
}
initKeysVector(keysVector, totalChildren);
flatmap_helper::initializeVector(
valuesVector, mapValueType, memoryPool_, nodeBatches);
if (!returnFlatVector_) {
for (auto batch : nodeBatches) {
valuesVector->append(batch);
}
}
}
BufferPtr indices;
vector_size_t* indicesPtr = nullptr;
if (!returnFlatVector_) {
indices = AlignedBuffer::allocate<int32_t>(totalChildren, &memoryPool_);
indices->setSize(totalChildren * sizeof(vector_size_t));
indicesPtr = indices->asMutable<vector_size_t>();
}
// now we're doing the rotation concat for sure to fill data
vector_size_t offset = 0;
auto* offsetsPtr = offsets->asMutable<vector_size_t>();
auto* lengthsPtr = lengths->asMutable<vector_size_t>();
for (uint64_t i = 0; i < numValues; ++i) {
// case for having map on this row
offsetsPtr[i] = offset;
if (!nullsPtr || !bits::isBitNull(nullsPtr, i)) {
bulkInMapIter.loadNext();
for (size_t j = 0; j < nodes.size(); j++) {
if (bulkInMapIter.hasValueAt(j)) {
nodes[j]->fillKeysVector(keysVector, offset, stringKeyBuffer_.get());
if (returnFlatVector_) {
flatmap_helper::copyOne(
mapValueType,
*valuesVector,
offset,
*nodeBatches[j],
nodeIndices[j]);
} else {
indicesPtr[offset] = startIndices[j] + nodeIndices[j];
}
offset++;
nodeIndices[j]++;
}
}
}
lengthsPtr[i] = offset - offsetsPtr[i];
}
DWIO_ENSURE_EQ(totalChildren, offset, "fill the same amount of items");
VLOG(1) << "[Flat-Map] num elements: " << numValues
<< ", total children: " << totalChildren;
if (totalChildren > 0 && !returnFlatVector_) {
valuesVector = BaseVector::wrapInDictionary(
nullptr, indices, totalChildren, std::move(valuesVector));
}
// When read-string-as-row flag is on, string readers produce ROW(BIGINT,
// BIGINT) type instead of VARCHAR or VARBINARY. In these cases,
// requestedType_->type is not the right type of the final vector.
auto mapType = (keysVector == nullptr || valuesVector == nullptr)
? requestedType_->type
: MAP(keysVector->type(), valuesVector->type());
// TODO Reuse
result = std::make_shared<MapVector>(
&memoryPool_,
mapType,
nulls,
numValues,
offsets,
lengths,
keysVector,
valuesVector,
nullCount);
}