in velox/dwio/dwrf/writer/ColumnWriter.cpp [1184:1292]
void StringColumnWriter::populateDictionaryEncodingStreams() {
ensureValidStreamWriters(true);
MemoryPool& pool{getMemoryPool(MemoryUsageCategory::GENERAL)};
DataBuffer<uint32_t> lookupTable{pool};
DataBuffer<bool> inDict{pool};
// All elements are initialized as 0.
DataBuffer<uint32_t> strideDictCounts{
pool,
strideOffsets_.size(),
};
finalDictionarySize_ = DictionaryEncodingUtils::getSortedIndexLookupTable(
dictEncoder_,
pool,
sort_,
DictionaryEncodingUtils::frequencyOrdering,
true,
lookupTable,
inDict,
strideDictCounts,
[&](auto buf, auto size) { dictionaryData_->write(buf, size); },
[&](auto buf, auto size) {
dictionaryDataLength_->add(buf, Ranges::of(0, size), nullptr);
});
// When all the Keys are in Dictionary, inDictionaryStream is omitted.
bool writeInDictionaryStream = finalDictionarySize_ != dictEncoder_.size();
// Record starting positions of the dictionary encoding streams.
recordDictionaryEncodingStreamPositions(
writeInDictionaryStream, 0, strideDictCounts);
// Add and populate index entry for rows_[start:end).
// TODO: T45260340 Unfortunately standard usage of the index
// builder would need to segment the writes in the data buffer again. Should
// we manually call add on index builder and still batch the entire write?
auto populateDictionaryStreams =
[&, this](size_t start, size_t end, size_t strideIndex) {
// Do this once per stride as opposed to per row to reduce branching
// cost.
if (writeInDictionaryStream) {
auto strideDictKeyCount = strideDictCounts[strideIndex];
DataBuffer<uint32_t> sortedStrideDictKeyIndexBuffer{pool};
sortedStrideDictKeyIndexBuffer.reserve(strideDictKeyCount);
{
auto inDictWriter = createBufferedWriter<char>(
getMemoryPool(MemoryUsageCategory::GENERAL),
64 * 1024,
[&](auto buf, auto size) {
inDictionary_->add(buf, Ranges::of(0, size), nullptr);
});
uint32_t strideDictSize = 0;
for (size_t i = start; i != end; ++i) {
auto origIndex = rows_[i];
bool valInDict = inDict[origIndex];
inDictWriter.add(valInDict ? 1 : 0);
// TODO: optimize this branching either through restoring visitor
// pattern, or through separating the index backfill.
if (!valInDict) {
auto strideDictIndex = lookupTable[origIndex];
sortedStrideDictKeyIndexBuffer[strideDictIndex] = origIndex;
++strideDictSize;
}
}
DWIO_ENSURE_EQ(strideDictSize, strideDictKeyCount);
}
// StrideDictKey can be empty, when all keys for stride are in
// dictionary.
if (strideDictKeyCount > 0) {
auto strideLengthWriter = createBufferedWriter<uint32_t>(
getMemoryPool(MemoryUsageCategory::GENERAL),
64 * 1024,
[&](auto buf, auto size) {
strideDictionaryDataLength_->add(
buf, Ranges::of(0, size), nullptr);
});
for (size_t i = 0; i < strideDictKeyCount; ++i) {
auto val = dictEncoder_.getKey(sortedStrideDictKeyIndexBuffer[i]);
strideDictionaryData_->write(val.data(), val.size());
strideLengthWriter.add(val.size());
}
}
}
bufferedWrite<uint32_t>(
pool,
64 * 1024,
start,
end,
[&](auto index) { return lookupTable[rows_[index]]; },
[&](auto buf, auto size) {
data_->add(buf, Ranges::of(0, size), nullptr);
});
};
// The writer always calls createIndexEntry before flush.
size_t lastIndexStrideOffset = strideOffsets_[strideOffsets_.size() - 1];
DWIO_ENSURE_EQ(lastIndexStrideOffset, rows_.size());
populateStrides(
populateDictionaryStreams,
std::bind(
&StringColumnWriter::recordDictionaryEncodingStreamPositions,
this,
writeInDictionaryStream,
std::placeholders::_1,
std::cref(strideDictCounts)));
}