void HiveDataSource::addSplit()

in velox/connectors/hive/HiveConnector.cpp [375:511]


void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
  VELOX_CHECK(
      split_ == nullptr,
      "Previous split has not been processed yet. Call next to process the split.");
  split_ = std::dynamic_pointer_cast<HiveConnectorSplit>(split);
  VELOX_CHECK(split_, "Wrong type of split");

  VLOG(1) << "Adding split " << split_->toString();

  fileHandle_ = fileHandleFactory_->generate(split_->filePath);
  // For DataCache and no cache, the stream keeps track of IO.
  auto asyncCache = dynamic_cast<cache::AsyncDataCache*>(mappedMemory_);
  // Decide between AsyncDataCache, legacy DataCache and no cache. All
  // three are supported to enable comparison.
  if (asyncCache) {
    VELOX_CHECK(
        !dataCache_,
        "DataCache should not be present if the MappedMemory is AsyncDataCache");
    // Make DataCacheConfig to pass the filenum and a null DataCache.
    if (!readerOpts_.getDataCacheConfig()) {
      auto dataCacheConfig = std::make_shared<dwio::common::DataCacheConfig>();
      readerOpts_.setDataCacheConfig(std::move(dataCacheConfig));
    }
    readerOpts_.getDataCacheConfig()->filenum = fileHandle_->uuid.id();
    bufferedInputFactory_ = std::make_unique<dwrf::CachedBufferedInputFactory>(
        (asyncCache),
        Connector::getTracker(scanId_, readerOpts_.loadQuantum()),
        fileHandle_->groupId.id(),
        [factory = fileHandleFactory_, path = split_->filePath]() {
          return makeStreamHolder(factory, path);
        },
        ioStats_,
        executor_,
        readerOpts_);
    readerOpts_.setBufferedInputFactory(bufferedInputFactory_.get());
  } else if (dataCache_) {
    auto dataCacheConfig = std::make_shared<dwio::common::DataCacheConfig>();
    dataCacheConfig->cache = dataCache_;
    dataCacheConfig->filenum = fileHandle_->uuid.id();
    readerOpts_.setDataCacheConfig(std::move(dataCacheConfig));
  }
  if (readerOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) {
    VELOX_CHECK(
        readerOpts_.getFileFormat() == split_->fileFormat,
        "HiveDataSource received splits of different formats: {} and {}",
        toString(readerOpts_.getFileFormat()),
        toString(split_->fileFormat));
  } else {
    readerOpts_.setFileFormat(split_->fileFormat);
  }

  // We run with the default BufferedInputFactory and no DataCacheConfig if
  // there is no DataCache and the MappedMemory is not an AsyncDataCache.
  reader_ = dwio::common::getReaderFactory(readerOpts_.getFileFormat())
                ->createReader(
                    std::make_unique<dwio::common::ReadFileInputStream>(
                        fileHandle_->file.get(),
                        dwio::common::MetricsLog::voidLog(),
                        asyncCache ? nullptr : ioStats_.get()),
                    readerOpts_);

  emptySplit_ = false;
  if (reader_->numberOfRows() == 0) {
    emptySplit_ = true;
    return;
  }

  // Check filters and see if the whole split can be skipped
  if (!testFilters(scanSpec_.get(), reader_.get(), split_->filePath)) {
    emptySplit_ = true;
    ++runtimeStats_.skippedSplits;
    runtimeStats_.skippedSplitBytes += split_->length;
    return;
  }

  auto fileType = reader_->rowType();

  for (int i = 0; i < readerOutputType_->size(); i++) {
    auto fieldName = readerOutputType_->nameOf(i);
    auto scanChildSpec = scanSpec_->childByName(fieldName);

    auto keyIt = split_->partitionKeys.find(fieldName);
    if (keyIt != split_->partitionKeys.end()) {
      setPartitionValue(scanChildSpec, fieldName, keyIt->second);
    } else if (fieldName == kPath) {
      setConstantValue(scanChildSpec, velox::variant(split_->filePath));
    } else if (fieldName == kBucket) {
      if (split_->tableBucketNumber.has_value()) {
        setConstantValue(
            scanChildSpec, velox::variant(split_->tableBucketNumber.value()));
      }
    } else if (!fileType->containsChild(fieldName)) {
      // Column is missing. Most likely due to schema evolution.
      setNullConstantValue(scanChildSpec, readerOutputType_->childAt(i));
    } else {
      scanChildSpec->setConstantValue(nullptr);
    }
  }

  // Set constant values for partition keys and $path column. If these are
  // used in filters only, the loop above will miss them.
  for (const auto& entry : split_->partitionKeys) {
    auto childSpec = scanSpec_->childByName(entry.first);
    if (childSpec) {
      setPartitionValue(childSpec, entry.first, entry.second);
    }
  }

  auto pathSpec = scanSpec_->childByName(kPath);
  if (pathSpec) {
    setConstantValue(pathSpec, velox::variant(split_->filePath));
  }

  auto bucketSpec = scanSpec_->childByName(kBucket);
  if (bucketSpec && split_->tableBucketNumber.has_value()) {
    setConstantValue(
        bucketSpec, velox::variant(split_->tableBucketNumber.value()));
  }

  std::vector<std::string> columnNames;
  for (auto& spec : scanSpec_->children()) {
    if (!spec->isConstant()) {
      columnNames.push_back(spec->fieldName());
    }
  }

  std::shared_ptr<dwio::common::ColumnSelector> cs;
  if (columnNames.empty()) {
    static const std::shared_ptr<const RowType> kEmpty{ROW({}, {})};
    cs = std::make_shared<dwio::common::ColumnSelector>(kEmpty);
  } else {
    cs = std::make_shared<dwio::common::ColumnSelector>(fileType, columnNames);
  }

  rowReader_ = reader_->createRowReader(
      rowReaderOpts_.select(cs).range(split_->start, split_->length));
}