in velox/connectors/hive/HiveConnector.cpp [375:511]
void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
VELOX_CHECK(
split_ == nullptr,
"Previous split has not been processed yet. Call next to process the split.");
split_ = std::dynamic_pointer_cast<HiveConnectorSplit>(split);
VELOX_CHECK(split_, "Wrong type of split");
VLOG(1) << "Adding split " << split_->toString();
fileHandle_ = fileHandleFactory_->generate(split_->filePath);
// For DataCache and no cache, the stream keeps track of IO.
auto asyncCache = dynamic_cast<cache::AsyncDataCache*>(mappedMemory_);
// Decide between AsyncDataCache, legacy DataCache and no cache. All
// three are supported to enable comparison.
if (asyncCache) {
VELOX_CHECK(
!dataCache_,
"DataCache should not be present if the MappedMemory is AsyncDataCache");
// Make DataCacheConfig to pass the filenum and a null DataCache.
if (!readerOpts_.getDataCacheConfig()) {
auto dataCacheConfig = std::make_shared<dwio::common::DataCacheConfig>();
readerOpts_.setDataCacheConfig(std::move(dataCacheConfig));
}
readerOpts_.getDataCacheConfig()->filenum = fileHandle_->uuid.id();
bufferedInputFactory_ = std::make_unique<dwrf::CachedBufferedInputFactory>(
(asyncCache),
Connector::getTracker(scanId_, readerOpts_.loadQuantum()),
fileHandle_->groupId.id(),
[factory = fileHandleFactory_, path = split_->filePath]() {
return makeStreamHolder(factory, path);
},
ioStats_,
executor_,
readerOpts_);
readerOpts_.setBufferedInputFactory(bufferedInputFactory_.get());
} else if (dataCache_) {
auto dataCacheConfig = std::make_shared<dwio::common::DataCacheConfig>();
dataCacheConfig->cache = dataCache_;
dataCacheConfig->filenum = fileHandle_->uuid.id();
readerOpts_.setDataCacheConfig(std::move(dataCacheConfig));
}
if (readerOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) {
VELOX_CHECK(
readerOpts_.getFileFormat() == split_->fileFormat,
"HiveDataSource received splits of different formats: {} and {}",
toString(readerOpts_.getFileFormat()),
toString(split_->fileFormat));
} else {
readerOpts_.setFileFormat(split_->fileFormat);
}
// We run with the default BufferedInputFactory and no DataCacheConfig if
// there is no DataCache and the MappedMemory is not an AsyncDataCache.
reader_ = dwio::common::getReaderFactory(readerOpts_.getFileFormat())
->createReader(
std::make_unique<dwio::common::ReadFileInputStream>(
fileHandle_->file.get(),
dwio::common::MetricsLog::voidLog(),
asyncCache ? nullptr : ioStats_.get()),
readerOpts_);
emptySplit_ = false;
if (reader_->numberOfRows() == 0) {
emptySplit_ = true;
return;
}
// Check filters and see if the whole split can be skipped
if (!testFilters(scanSpec_.get(), reader_.get(), split_->filePath)) {
emptySplit_ = true;
++runtimeStats_.skippedSplits;
runtimeStats_.skippedSplitBytes += split_->length;
return;
}
auto fileType = reader_->rowType();
for (int i = 0; i < readerOutputType_->size(); i++) {
auto fieldName = readerOutputType_->nameOf(i);
auto scanChildSpec = scanSpec_->childByName(fieldName);
auto keyIt = split_->partitionKeys.find(fieldName);
if (keyIt != split_->partitionKeys.end()) {
setPartitionValue(scanChildSpec, fieldName, keyIt->second);
} else if (fieldName == kPath) {
setConstantValue(scanChildSpec, velox::variant(split_->filePath));
} else if (fieldName == kBucket) {
if (split_->tableBucketNumber.has_value()) {
setConstantValue(
scanChildSpec, velox::variant(split_->tableBucketNumber.value()));
}
} else if (!fileType->containsChild(fieldName)) {
// Column is missing. Most likely due to schema evolution.
setNullConstantValue(scanChildSpec, readerOutputType_->childAt(i));
} else {
scanChildSpec->setConstantValue(nullptr);
}
}
// Set constant values for partition keys and $path column. If these are
// used in filters only, the loop above will miss them.
for (const auto& entry : split_->partitionKeys) {
auto childSpec = scanSpec_->childByName(entry.first);
if (childSpec) {
setPartitionValue(childSpec, entry.first, entry.second);
}
}
auto pathSpec = scanSpec_->childByName(kPath);
if (pathSpec) {
setConstantValue(pathSpec, velox::variant(split_->filePath));
}
auto bucketSpec = scanSpec_->childByName(kBucket);
if (bucketSpec && split_->tableBucketNumber.has_value()) {
setConstantValue(
bucketSpec, velox::variant(split_->tableBucketNumber.value()));
}
std::vector<std::string> columnNames;
for (auto& spec : scanSpec_->children()) {
if (!spec->isConstant()) {
columnNames.push_back(spec->fieldName());
}
}
std::shared_ptr<dwio::common::ColumnSelector> cs;
if (columnNames.empty()) {
static const std::shared_ptr<const RowType> kEmpty{ROW({}, {})};
cs = std::make_shared<dwio::common::ColumnSelector>(kEmpty);
} else {
cs = std::make_shared<dwio::common::ColumnSelector>(fileType, columnNames);
}
rowReader_ = reader_->createRowReader(
rowReaderOpts_.select(cs).range(split_->start, split_->length));
}