void StringColumnWriter::populateDictionaryEncodingStreams()

in velox/dwio/dwrf/writer/ColumnWriter.cpp [1184:1292]


void StringColumnWriter::populateDictionaryEncodingStreams() {
  ensureValidStreamWriters(true);
  MemoryPool& pool{getMemoryPool(MemoryUsageCategory::GENERAL)};
  DataBuffer<uint32_t> lookupTable{pool};
  DataBuffer<bool> inDict{pool};
  // All elements are initialized as 0.
  DataBuffer<uint32_t> strideDictCounts{
      pool,
      strideOffsets_.size(),
  };
  finalDictionarySize_ = DictionaryEncodingUtils::getSortedIndexLookupTable(
      dictEncoder_,
      pool,
      sort_,
      DictionaryEncodingUtils::frequencyOrdering,
      true,
      lookupTable,
      inDict,
      strideDictCounts,
      [&](auto buf, auto size) { dictionaryData_->write(buf, size); },
      [&](auto buf, auto size) {
        dictionaryDataLength_->add(buf, Ranges::of(0, size), nullptr);
      });

  // When all the Keys are in Dictionary, inDictionaryStream is omitted.
  bool writeInDictionaryStream = finalDictionarySize_ != dictEncoder_.size();

  // Record starting positions of the dictionary encoding streams.
  recordDictionaryEncodingStreamPositions(
      writeInDictionaryStream, 0, strideDictCounts);

  // Add and populate index entry for rows_[start:end).
  // TODO: T45260340 Unfortunately standard usage of the index
  // builder would need to segment the writes in the data buffer again. Should
  // we manually call add on index builder and still batch the entire write?
  auto populateDictionaryStreams =
      [&, this](size_t start, size_t end, size_t strideIndex) {
        // Do this once per stride as opposed to per row to reduce branching
        // cost.

        if (writeInDictionaryStream) {
          auto strideDictKeyCount = strideDictCounts[strideIndex];
          DataBuffer<uint32_t> sortedStrideDictKeyIndexBuffer{pool};
          sortedStrideDictKeyIndexBuffer.reserve(strideDictKeyCount);
          {
            auto inDictWriter = createBufferedWriter<char>(
                getMemoryPool(MemoryUsageCategory::GENERAL),
                64 * 1024,
                [&](auto buf, auto size) {
                  inDictionary_->add(buf, Ranges::of(0, size), nullptr);
                });

            uint32_t strideDictSize = 0;
            for (size_t i = start; i != end; ++i) {
              auto origIndex = rows_[i];
              bool valInDict = inDict[origIndex];
              inDictWriter.add(valInDict ? 1 : 0);
              // TODO: optimize this branching either through restoring visitor
              // pattern, or through separating the index backfill.
              if (!valInDict) {
                auto strideDictIndex = lookupTable[origIndex];
                sortedStrideDictKeyIndexBuffer[strideDictIndex] = origIndex;
                ++strideDictSize;
              }
            }
            DWIO_ENSURE_EQ(strideDictSize, strideDictKeyCount);
          }

          // StrideDictKey can be empty, when all keys for stride are in
          // dictionary.
          if (strideDictKeyCount > 0) {
            auto strideLengthWriter = createBufferedWriter<uint32_t>(
                getMemoryPool(MemoryUsageCategory::GENERAL),
                64 * 1024,
                [&](auto buf, auto size) {
                  strideDictionaryDataLength_->add(
                      buf, Ranges::of(0, size), nullptr);
                });
            for (size_t i = 0; i < strideDictKeyCount; ++i) {
              auto val = dictEncoder_.getKey(sortedStrideDictKeyIndexBuffer[i]);
              strideDictionaryData_->write(val.data(), val.size());
              strideLengthWriter.add(val.size());
            }
          }
        }

        bufferedWrite<uint32_t>(
            pool,
            64 * 1024,
            start,
            end,
            [&](auto index) { return lookupTable[rows_[index]]; },
            [&](auto buf, auto size) {
              data_->add(buf, Ranges::of(0, size), nullptr);
            });
      };

  // The writer always calls createIndexEntry before flush.
  size_t lastIndexStrideOffset = strideOffsets_[strideOffsets_.size() - 1];
  DWIO_ENSURE_EQ(lastIndexStrideOffset, rows_.size());
  populateStrides(
      populateDictionaryStreams,
      std::bind(
          &StringColumnWriter::recordDictionaryEncodingStreamPositions,
          this,
          writeInDictionaryStream,
          std::placeholders::_1,
          std::cref(strideDictCounts)));
}