Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState()

in be/src/exec/hdfs-scan-node-base.cc [245:410]


Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* state) {
  // Initialize the template tuple pool.
  using namespace org::apache::impala::fb;
  shared_state_.template_pool_.reset(new MemPool(state->query_mem_tracker()));
  auto& template_tuple_map_ = shared_state_.partition_template_tuple_map_;
  ObjectPool* obj_pool = shared_state_.obj_pool();
  auto& file_descs = shared_state_.file_descs_;
  HdfsFsCache::HdfsFsMap fs_cache;
  int num_ranges_missing_volume_id = 0;
  int64_t total_splits = 0;
  const vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs =
      state->instance_ctx_pbs();
  auto instance_ctxs = state->instance_ctxs();
  DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size());
  for (int i = 0; i < instance_ctxs.size(); ++i) {
    auto ctx = instance_ctx_pbs[i];
    auto instance_ctx = instance_ctxs[i];
    auto ranges = ctx->per_node_scan_ranges().find(tnode_->node_id);
    if (ranges == ctx->per_node_scan_ranges().end()) continue;
    for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) {
      DCHECK(params.scan_range().has_hdfs_file_split());
      const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
      const org::apache::impala::fb::FbFileMetadata* file_metadata = nullptr;
      if (params.scan_range().has_file_metadata()) {
        file_metadata = flatbuffers::GetRoot<org::apache::impala::fb::FbFileMetadata>(
            params.scan_range().file_metadata().c_str());
      }
      HdfsPartitionDescriptor* partition_desc =
          hdfs_table_->GetPartition(split.partition_id());
      if (template_tuple_map_.find(split.partition_id()) == template_tuple_map_.end()) {
        template_tuple_map_[split.partition_id()] =
            InitTemplateTuple(partition_desc->partition_key_value_evals(),
                shared_state_.template_pool_.get());
      }
      // Convert the ScanRangeParamsPB into per-file DiskIO::ScanRange objects and
      // populate partition_ids_, file_descs_, and per_type_files_.
      if (partition_desc == nullptr) {
        // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
        LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
                   << " partition_id=" << split.partition_id() << "\n"
                   << state->fragment()
                   << state->fragment_ctx().DebugString();
        return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
                      " Try rerunning the query.");
      }

      filesystem::path file_path;
      if (hdfs_table_->IsIcebergTable() && split.relative_path().empty()) {
        file_path.append(split.absolute_path(), filesystem::path::codecvt());
      } else {
        file_path.append(partition_desc->location(), filesystem::path::codecvt())
            .append(split.relative_path(), filesystem::path::codecvt());
      }

      const string& native_file_path = file_path.native();

      auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
      HdfsFileDesc* file_desc = nullptr;
      auto file_desc_it = file_descs.find(file_desc_map_key);
      if (file_desc_it == file_descs.end()) {
        // Add new file_desc to file_descs_ and per_type_files_
        file_desc = obj_pool->Add(new HdfsFileDesc(native_file_path));
        file_descs[file_desc_map_key] = file_desc;
        file_desc->file_length = split.file_length();
        file_desc->mtime = split.mtime();
        file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
        file_desc->is_encrypted = split.is_encrypted();
        file_desc->is_erasure_coded = split.is_erasure_coded();
        file_desc->file_metadata = file_metadata;
        file_desc->fragment_instance_id = instance_ctx->fragment_instance_id;
        if (file_metadata) {
          DCHECK(file_metadata->iceberg_metadata() != nullptr);
          switch (file_metadata->iceberg_metadata()->file_format()) {
            case FbIcebergDataFileFormat::FbIcebergDataFileFormat_PARQUET:
              file_desc->file_format = THdfsFileFormat::PARQUET;
              break;
            case FbIcebergDataFileFormat::FbIcebergDataFileFormat_ORC:
              file_desc->file_format = THdfsFileFormat::ORC;
              break;
            case FbIcebergDataFileFormat::FbIcebergDataFileFormat_AVRO:
              file_desc->file_format = THdfsFileFormat::AVRO;
              break;
            default:
              return Status(Substitute(
                  "Unknown Iceberg file format type: $0",
                  file_metadata->iceberg_metadata()->file_format()));
          }
        } else {
          file_desc->file_format = partition_desc->file_format();
        }
        RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
            native_file_path, &file_desc->fs, &fs_cache));
        shared_state_.per_type_files_[partition_desc->file_format()].push_back(file_desc);
      } else {
        // File already processed
        file_desc = file_desc_it->second;
      }

      bool expected_local = params.has_is_remote() && !params.is_remote();
      if (expected_local && params.volume_id() == -1) {
        // IMPALA-11541 TODO: Ozone returns a list of null volume IDs. So we know the
        // number of volumes, but ID will be -1. Skip this metric for Ozone paths because
        // it doesn't convey useful feedback.
        if (!IsOzonePath(partition_desc->location().c_str())) {
          ++num_ranges_missing_volume_id;
        }
      }

      int cache_options = BufferOpts::NO_CACHING;
      if (params.has_try_hdfs_cache() && params.try_hdfs_cache()) {
        cache_options |= BufferOpts::USE_HDFS_CACHE;
      }
      if ((!expected_local || FLAGS_always_use_data_cache)
          && !state->query_options().disable_data_cache) {
        cache_options |= BufferOpts::USE_DATA_CACHE;
      }
      ScanRangeMetadata* metadata =
          obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr));
      file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool,
          file_desc->GetFileInfo(), split.length(), split.offset(), {}, metadata,
          params.volume_id(), expected_local, BufferOpts(cache_options)));
      total_splits++;
    }
    // Update server wide metrics for number of scan ranges and ranges that have
    // incomplete metadata.
    ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(ranges->second.scan_ranges().size());
    ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
  }
  // Set up the rest of the shared state.
  shared_state_.remaining_scan_range_submissions_.Store(instance_ctx_pbs.size());
  shared_state_.progress().Init(
      Substitute("Splits complete (node=$0)", tnode_->node_id), total_splits);
  shared_state_.use_mt_scan_node_ = tnode_->hdfs_scan_node.use_mt_scan_node;
  shared_state_.scan_range_queue_.Reserve(total_splits);

  // Distribute the work evenly for issuing initial scan ranges.
  DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1)
      << "Non MT scan node should only have a single instance.";
  if (tnode_->hdfs_scan_node.deterministic_scanrange_assignment) {
    // If using deterministic scan range assignment, there is no need to rebalance
    // the scan ranges. The scan ranges stay with their original fragment instance.
    for (auto& fd : file_descs) {
      const TUniqueId& instance_id = fd.second->fragment_instance_id;
      shared_state_.file_assignment_per_instance_[instance_id].push_back(fd.second);
    }
  } else {
    // When not using the deterministic scan range assignment, the scan ranges are
    // balanced round robin across fragment instances for the purpose of issuing
    // initial scan ranges.
    int files_per_instance = file_descs.size() / instance_ctxs.size();
    int remainder = file_descs.size() % instance_ctxs.size();
    int num_lists = min(file_descs.size(), instance_ctxs.size());
    auto fd_it = file_descs.begin();
    for (int i = 0; i < num_lists; ++i) {
      vector<HdfsFileDesc*>* curr_file_list =
          &shared_state_
              .file_assignment_per_instance_[instance_ctxs[i]->fragment_instance_id];
      for (int j = 0; j < files_per_instance + (i < remainder); ++j) {
        curr_file_list->push_back(fd_it->second);
        ++fd_it;
      }
    }
    DCHECK(fd_it == file_descs.end());
  }
  return Status::OK();
}