in be/src/exec/hdfs-scan-node-base.cc [245:410]
Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* state) {
// Initialize the template tuple pool.
using namespace org::apache::impala::fb;
shared_state_.template_pool_.reset(new MemPool(state->query_mem_tracker()));
auto& template_tuple_map_ = shared_state_.partition_template_tuple_map_;
ObjectPool* obj_pool = shared_state_.obj_pool();
auto& file_descs = shared_state_.file_descs_;
HdfsFsCache::HdfsFsMap fs_cache;
int num_ranges_missing_volume_id = 0;
int64_t total_splits = 0;
const vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs =
state->instance_ctx_pbs();
auto instance_ctxs = state->instance_ctxs();
DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size());
for (int i = 0; i < instance_ctxs.size(); ++i) {
auto ctx = instance_ctx_pbs[i];
auto instance_ctx = instance_ctxs[i];
auto ranges = ctx->per_node_scan_ranges().find(tnode_->node_id);
if (ranges == ctx->per_node_scan_ranges().end()) continue;
for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) {
DCHECK(params.scan_range().has_hdfs_file_split());
const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
const org::apache::impala::fb::FbFileMetadata* file_metadata = nullptr;
if (params.scan_range().has_file_metadata()) {
file_metadata = flatbuffers::GetRoot<org::apache::impala::fb::FbFileMetadata>(
params.scan_range().file_metadata().c_str());
}
HdfsPartitionDescriptor* partition_desc =
hdfs_table_->GetPartition(split.partition_id());
if (template_tuple_map_.find(split.partition_id()) == template_tuple_map_.end()) {
template_tuple_map_[split.partition_id()] =
InitTemplateTuple(partition_desc->partition_key_value_evals(),
shared_state_.template_pool_.get());
}
// Convert the ScanRangeParamsPB into per-file DiskIO::ScanRange objects and
// populate partition_ids_, file_descs_, and per_type_files_.
if (partition_desc == nullptr) {
// TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
<< " partition_id=" << split.partition_id() << "\n"
<< state->fragment()
<< state->fragment_ctx().DebugString();
return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
" Try rerunning the query.");
}
filesystem::path file_path;
if (hdfs_table_->IsIcebergTable() && split.relative_path().empty()) {
file_path.append(split.absolute_path(), filesystem::path::codecvt());
} else {
file_path.append(partition_desc->location(), filesystem::path::codecvt())
.append(split.relative_path(), filesystem::path::codecvt());
}
const string& native_file_path = file_path.native();
auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
HdfsFileDesc* file_desc = nullptr;
auto file_desc_it = file_descs.find(file_desc_map_key);
if (file_desc_it == file_descs.end()) {
// Add new file_desc to file_descs_ and per_type_files_
file_desc = obj_pool->Add(new HdfsFileDesc(native_file_path));
file_descs[file_desc_map_key] = file_desc;
file_desc->file_length = split.file_length();
file_desc->mtime = split.mtime();
file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
file_desc->is_encrypted = split.is_encrypted();
file_desc->is_erasure_coded = split.is_erasure_coded();
file_desc->file_metadata = file_metadata;
file_desc->fragment_instance_id = instance_ctx->fragment_instance_id;
if (file_metadata) {
DCHECK(file_metadata->iceberg_metadata() != nullptr);
switch (file_metadata->iceberg_metadata()->file_format()) {
case FbIcebergDataFileFormat::FbIcebergDataFileFormat_PARQUET:
file_desc->file_format = THdfsFileFormat::PARQUET;
break;
case FbIcebergDataFileFormat::FbIcebergDataFileFormat_ORC:
file_desc->file_format = THdfsFileFormat::ORC;
break;
case FbIcebergDataFileFormat::FbIcebergDataFileFormat_AVRO:
file_desc->file_format = THdfsFileFormat::AVRO;
break;
default:
return Status(Substitute(
"Unknown Iceberg file format type: $0",
file_metadata->iceberg_metadata()->file_format()));
}
} else {
file_desc->file_format = partition_desc->file_format();
}
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
native_file_path, &file_desc->fs, &fs_cache));
shared_state_.per_type_files_[partition_desc->file_format()].push_back(file_desc);
} else {
// File already processed
file_desc = file_desc_it->second;
}
bool expected_local = params.has_is_remote() && !params.is_remote();
if (expected_local && params.volume_id() == -1) {
// IMPALA-11541 TODO: Ozone returns a list of null volume IDs. So we know the
// number of volumes, but ID will be -1. Skip this metric for Ozone paths because
// it doesn't convey useful feedback.
if (!IsOzonePath(partition_desc->location().c_str())) {
++num_ranges_missing_volume_id;
}
}
int cache_options = BufferOpts::NO_CACHING;
if (params.has_try_hdfs_cache() && params.try_hdfs_cache()) {
cache_options |= BufferOpts::USE_HDFS_CACHE;
}
if ((!expected_local || FLAGS_always_use_data_cache)
&& !state->query_options().disable_data_cache) {
cache_options |= BufferOpts::USE_DATA_CACHE;
}
ScanRangeMetadata* metadata =
obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr));
file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool,
file_desc->GetFileInfo(), split.length(), split.offset(), {}, metadata,
params.volume_id(), expected_local, BufferOpts(cache_options)));
total_splits++;
}
// Update server wide metrics for number of scan ranges and ranges that have
// incomplete metadata.
ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(ranges->second.scan_ranges().size());
ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
}
// Set up the rest of the shared state.
shared_state_.remaining_scan_range_submissions_.Store(instance_ctx_pbs.size());
shared_state_.progress().Init(
Substitute("Splits complete (node=$0)", tnode_->node_id), total_splits);
shared_state_.use_mt_scan_node_ = tnode_->hdfs_scan_node.use_mt_scan_node;
shared_state_.scan_range_queue_.Reserve(total_splits);
// Distribute the work evenly for issuing initial scan ranges.
DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1)
<< "Non MT scan node should only have a single instance.";
if (tnode_->hdfs_scan_node.deterministic_scanrange_assignment) {
// If using deterministic scan range assignment, there is no need to rebalance
// the scan ranges. The scan ranges stay with their original fragment instance.
for (auto& fd : file_descs) {
const TUniqueId& instance_id = fd.second->fragment_instance_id;
shared_state_.file_assignment_per_instance_[instance_id].push_back(fd.second);
}
} else {
// When not using the deterministic scan range assignment, the scan ranges are
// balanced round robin across fragment instances for the purpose of issuing
// initial scan ranges.
int files_per_instance = file_descs.size() / instance_ctxs.size();
int remainder = file_descs.size() % instance_ctxs.size();
int num_lists = min(file_descs.size(), instance_ctxs.size());
auto fd_it = file_descs.begin();
for (int i = 0; i < num_lists; ++i) {
vector<HdfsFileDesc*>* curr_file_list =
&shared_state_
.file_assignment_per_instance_[instance_ctxs[i]->fragment_instance_id];
for (int j = 0; j < files_per_instance + (i < remainder); ++j) {
curr_file_list->push_back(fd_it->second);
++fd_it;
}
}
DCHECK(fd_it == file_descs.end());
}
return Status::OK();
}