Status TmpFileMgr::InitCustom()

in be/src/runtime/tmp-file-mgr.cc [214:415]


Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
    bool one_dir_per_device, const string& compression_codec, bool punch_holes,
    MetricGroup* metrics) {
  DCHECK(!initialized_);
  punch_holes_ = punch_holes;
  one_dir_per_device_ = one_dir_per_device;
  if (tmp_dir_specifiers.empty()) {
    LOG(WARNING) << "Running without spill to disk: no scratch directories provided.";
  }
  if (!compression_codec.empty()) {
    if (!punch_holes) {
      return Status("--disk_spill_punch_holes must be true if disk spill compression "
                    "is enabled");
    }
    Status codec_parse_status = ParseUtil::ParseCompressionCodec(
        compression_codec, &compression_codec_, &compression_level_);
    if (!codec_parse_status.ok()) {
      return Status(
          Substitute("Could not parse --disk_spill_compression_codec value '$0': $1",
              compression_codec, codec_parse_status.GetDetail()));
    }
    if (compression_enabled()) {
      compressed_buffer_tracker_.reset(
          new MemTracker(FLAGS_disk_spill_compression_buffer_limit_bytes,
              "Spill-to-disk temporary compression buffers",
              ExecEnv::GetInstance()->process_mem_tracker()));
    }
  }

  bool is_percent;
  tmp_dirs_remote_ctrl_.remote_tmp_file_size_ =
      ParseUtil::ParseMemSpec(FLAGS_remote_tmp_file_size, &is_percent, 0);
  if (tmp_dirs_remote_ctrl_.remote_tmp_file_size_ <= 0) {
    return Status(Substitute(
        "Invalid value of remote_tmp_file_size '$0'", FLAGS_remote_tmp_file_size));
  }
  if (tmp_dirs_remote_ctrl_.remote_tmp_file_size_
      > MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB * 1024 * 1024) {
    tmp_dirs_remote_ctrl_.remote_tmp_file_size_ =
        MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB * 1024 * 1024;
  }
  tmp_dirs_remote_ctrl_.remote_tmp_block_size_ =
      ParseUtil::ParseMemSpec(FLAGS_remote_tmp_file_block_size, &is_percent,
          tmp_dirs_remote_ctrl_.remote_tmp_file_size_);
  if (tmp_dirs_remote_ctrl_.remote_tmp_block_size_ <= 0) {
    return Status(Substitute(
        "Invalid value of remote_tmp_block_size '$0'", FLAGS_remote_tmp_file_block_size));
  }
  tmp_dirs_remote_ctrl_.wait_for_spill_buffer_timeout_us_ =
      FLAGS_wait_for_spill_buffer_timeout_s * MICROS_PER_SEC;
  if (tmp_dirs_remote_ctrl_.wait_for_spill_buffer_timeout_us_ <= 0) {
    return Status(Substitute("Invalid value of wait_for_spill_buffer_timeout_us '$0'",
        FLAGS_wait_for_spill_buffer_timeout_s));
  }

  tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = FLAGS_remote_batch_read;
  if (tmp_dirs_remote_ctrl_.remote_batch_read_enabled_) {
    Status setup_read_buffer_status = tmp_dirs_remote_ctrl_.SetUpReadBufferParams();
    if (!setup_read_buffer_status.ok()) {
      LOG(WARNING) << "Disabled the read buffer for the remote temporary files "
                      "due to errors in read buffer parameters: "
                   << setup_read_buffer_status.msg().msg();
      tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = false;
    }
  }

  // Below options are using for test by setting different modes to implement the
  // spilling to the remote.
  tmp_dirs_remote_ctrl_.remote_tmp_files_avail_pool_lifo_ =
      FLAGS_remote_tmp_files_avail_pool_lifo;

  vector<std::unique_ptr<TmpDir>> tmp_dirs;
  // need_local_buffer_dir indicates if currently we need to a directory in local scratch
  // space for being the buffer of a remote directory.
  bool need_local_buffer_dir = false;

  // Parse the directory specifiers. Don't return an error on parse errors, just log a
  // warning - we don't want to abort process startup because of misconfigured scratch,
  // since queries will generally still be runnable.
  for (const string& tmp_dir_spec : tmp_dir_specifiers) {
    string tmp_dir_spec_trimmed(boost::algorithm::trim_copy(tmp_dir_spec));
    std::unique_ptr<TmpDir> tmp_dir;

    if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false)
        || IsOzonePath(tmp_dir_spec_trimmed.c_str(), false)) {
      tmp_dir = std::make_unique<TmpDirHdfs>(tmp_dir_spec_trimmed);
    } else if (IsS3APath(tmp_dir_spec_trimmed.c_str(), false)) {
      // Initialize the S3 options for later getting S3 connection.
      s3a_options_ = {make_pair("fs.s3a.fast.upload", "true"),
          make_pair("fs.s3a.fast.upload.buffer", "disk")};
      tmp_dir = std::make_unique<TmpDirS3>(tmp_dir_spec_trimmed);
    } else if (IsGcsPath(tmp_dir_spec_trimmed.c_str(), false)) {
      // TODO(IMPALA-10561): Add support for spilling to GCS
    } else {
      tmp_dir = std::make_unique<TmpDirLocal>(tmp_dir_spec_trimmed);
    }

    DCHECK(tmp_dir != nullptr);
    Status parse_status = tmp_dir->Parse();
    if (!parse_status.ok()) {
      LOG(WARNING) << "Directory " << tmp_dir_spec.c_str() << " is not used because "
                   << parse_status.msg().msg();
      continue;
    }

    if (!tmp_dir->is_local()) {
      // Set the flag to reserve a local dir for buffer.
      // If the flag has been set, meaning that there is already one remote dir
      // registered, since we only support one remote dir, this remote dir will be
      // abandoned.
      if (need_local_buffer_dir) {
        LOG(WARNING) << "Only one remote directory is supported. Extra remote directory "
                     << tmp_dir_spec.c_str() << " is not used.";
        continue;
      } else {
        need_local_buffer_dir = true;
      }
    }
    tmp_dirs.emplace_back(move(tmp_dir));
  }

  vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false);
  // For each tmp directory, find the disk it is on,
  // so additional tmp directories on the same disk can be skipped.
  for (int i = 0; i < tmp_dirs.size(); ++i) {
    Status status = tmp_dirs[i]->VerifyAndCreate(
        metrics, &is_tmp_dir_on_disk, need_local_buffer_dir, this);
    if (!status.ok()) {
      // If the remote directory fails to verify or create, return the error.
      if (!tmp_dirs[i]->is_local()) return status;
      // If it is the local directory, continue to try next directory.
      continue;
    }
    if (tmp_dirs[i]->is_local()) {
      if (need_local_buffer_dir) {
        local_buff_dir_ = move(tmp_dirs[i]);
        need_local_buffer_dir = false;
      } else {
        tmp_dirs_.emplace_back(move(tmp_dirs[i]));
      }
    } else {
      tmp_dirs_remote_ = move(tmp_dirs[i]);
    }
  }

  // Sort the tmp directories by priority.
  std::sort(tmp_dirs_.begin(), tmp_dirs_.end(),
      [](const std::unique_ptr<TmpDir>& a, const std::unique_ptr<TmpDir>& b) {
        return a->priority_ < b->priority_;
      });

  if (HasRemoteDir()) {
    if (local_buff_dir_ == nullptr) {
      // Should at least have one local dir for the buffer. Later we might allow to use
      // s3 fast upload directly without a buffer.
      return Status(
          Substitute("No local directory configured for remote scratch space:  $0",
              tmp_dirs_remote_->path_));
    } else {
      LOG(INFO) << "Using scratch directory " << tmp_dirs_remote_->path_ << " limit: "
                << PrettyPrinter::PrintBytes(tmp_dirs_remote_->bytes_limit_);
      IntGauge* bytes_used_metric = metrics->AddGauge(
          SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs_.size()));
      tmp_dirs_remote_->bytes_used_metric_ = bytes_used_metric;
    }
  }

  DCHECK(metrics != nullptr);
  num_active_scratch_dirs_metric_ =
      metrics->AddGauge(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
  active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
      metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
  if (HasRemoteDir()) {
    num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size() + 1);
  } else {
    num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
  }
  for (int i = 0; i < tmp_dirs_.size(); ++i) {
    active_scratch_dirs_metric_->Add(tmp_dirs_[i]->path_);
  }
  if (HasRemoteDir()) {
    active_scratch_dirs_metric_->Add(tmp_dirs_remote_->path_);
    RETURN_IF_ERROR(CreateTmpFileBufferPoolThread(metrics));
  }

  scratch_bytes_used_metric_ =
      metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK,
          TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED, 0);

  scratch_read_memory_buffer_used_metric_ =
      metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED_HIGH_WATER_MARK,
          TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED, 0);

  initialized_ = true;

  if ((tmp_dirs_.empty() && local_buff_dir_ == nullptr) && !tmp_dirs.empty()) {
    LOG(ERROR) << "Running without spill to disk: could not use any scratch "
               << "directories in list: " << join(tmp_dir_specifiers, ",")
               << ". See previous warnings for information on causes.";
  }
  return Status::OK();
}