void FlatMapColumnReader::next()

in velox/dwio/dwrf/reader/FlatMapColumnReader.cpp [320:466]


void FlatMapColumnReader<T>::next(
    uint64_t numValues,
    VectorPtr& result,
    const uint64_t* incomingNulls) {
  auto mapVector = detail::resetIfWrongVectorType<MapVector>(result);
  VectorPtr keysVector;
  VectorPtr valuesVector;
  BufferPtr offsets;
  BufferPtr lengths;
  if (mapVector) {
    keysVector = mapVector->mapKeys();
    if (returnFlatVector_) {
      valuesVector = mapVector->mapValues();
    }
    offsets = mapVector->mutableOffsets(numValues);
    lengths = mapVector->mutableSizes(numValues);
  }

  BufferPtr nulls = readNulls(numValues, result, incomingNulls);
  const auto* nullsPtr = nulls ? nulls->as<uint64_t>() : nullptr;
  uint64_t nullCount = nullsPtr ? bits::countNulls(nullsPtr, 0, numValues) : 0;

  if (mapVector) {
    detail::resetIfNotWritable(result, offsets, lengths);
  }

  if (!offsets) {
    offsets = AlignedBuffer::allocate<vector_size_t>(numValues, &memoryPool_);
  }
  if (!lengths) {
    lengths = AlignedBuffer::allocate<vector_size_t>(numValues, &memoryPool_);
  }

  auto nonNullMaps = numValues - nullCount;

  // opt - only loop over nodes that have value
  std::vector<KeyNode<T>*> nodes;
  utils::BulkBitIterator<char> bulkInMapIter{};
  std::vector<const BaseVector*> nodeBatches;
  size_t totalChildren = 0;
  if (nonNullMaps > 0) {
    for (auto& node : keyNodes_) {
      // if the node has value filled into key-value batch
      // future optimization - enable batch to be sortable on row index
      // and below next can be updated to next(keys, values, numValues)
      // which writes row index into batch and offsets can be generated
      auto batch = node->load(nonNullMaps);
      if (batch) {
        nodes.emplace_back(node.get());
        node->addToBulkInMapBitIterator(bulkInMapIter);
        nodeBatches.push_back(batch);
        totalChildren += batch->size();
      }
    }
  }

  size_t startIndices[nodeBatches.size()];
  size_t nodeIndices[nodeBatches.size()];

  auto& mapValueType = requestedType_->type->asMap().valueType();
  if (totalChildren > 0) {
    size_t childOffset = 0;
    for (size_t i = 0; i < nodeBatches.size(); i++) {
      nodeIndices[i] = 0;
      startIndices[i] = childOffset;
      childOffset += nodeBatches[i]->size();
    }

    initKeysVector(keysVector, totalChildren);
    flatmap_helper::initializeVector(
        valuesVector, mapValueType, memoryPool_, nodeBatches);

    if (!returnFlatVector_) {
      for (auto batch : nodeBatches) {
        valuesVector->append(batch);
      }
    }
  }

  BufferPtr indices;
  vector_size_t* indicesPtr = nullptr;
  if (!returnFlatVector_) {
    indices = AlignedBuffer::allocate<int32_t>(totalChildren, &memoryPool_);
    indices->setSize(totalChildren * sizeof(vector_size_t));
    indicesPtr = indices->asMutable<vector_size_t>();
  }

  // now we're doing the rotation concat for sure to fill data
  vector_size_t offset = 0;
  auto* offsetsPtr = offsets->asMutable<vector_size_t>();
  auto* lengthsPtr = lengths->asMutable<vector_size_t>();
  for (uint64_t i = 0; i < numValues; ++i) {
    // case for having map on this row
    offsetsPtr[i] = offset;
    if (!nullsPtr || !bits::isBitNull(nullsPtr, i)) {
      bulkInMapIter.loadNext();
      for (size_t j = 0; j < nodes.size(); j++) {
        if (bulkInMapIter.hasValueAt(j)) {
          nodes[j]->fillKeysVector(keysVector, offset, stringKeyBuffer_.get());
          if (returnFlatVector_) {
            flatmap_helper::copyOne(
                mapValueType,
                *valuesVector,
                offset,
                *nodeBatches[j],
                nodeIndices[j]);
          } else {
            indicesPtr[offset] = startIndices[j] + nodeIndices[j];
          }
          offset++;
          nodeIndices[j]++;
        }
      }
    }

    lengthsPtr[i] = offset - offsetsPtr[i];
  }

  DWIO_ENSURE_EQ(totalChildren, offset, "fill the same amount of items");

  VLOG(1) << "[Flat-Map] num elements: " << numValues
          << ", total children: " << totalChildren;

  if (totalChildren > 0 && !returnFlatVector_) {
    valuesVector = BaseVector::wrapInDictionary(
        nullptr, indices, totalChildren, std::move(valuesVector));
  }

  // When read-string-as-row flag is on, string readers produce ROW(BIGINT,
  // BIGINT) type instead of VARCHAR or VARBINARY. In these cases,
  // requestedType_->type is not the right type of the final vector.
  auto mapType = (keysVector == nullptr || valuesVector == nullptr)
      ? requestedType_->type
      : MAP(keysVector->type(), valuesVector->type());

  // TODO Reuse
  result = std::make_shared<MapVector>(
      &memoryPool_,
      mapType,
      nulls,
      numValues,
      offsets,
      lengths,
      keysVector,
      valuesVector,
      nullCount);
}