in be/src/runtime/tmp-file-mgr.cc [214:415]
Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
bool one_dir_per_device, const string& compression_codec, bool punch_holes,
MetricGroup* metrics) {
DCHECK(!initialized_);
punch_holes_ = punch_holes;
one_dir_per_device_ = one_dir_per_device;
if (tmp_dir_specifiers.empty()) {
LOG(WARNING) << "Running without spill to disk: no scratch directories provided.";
}
if (!compression_codec.empty()) {
if (!punch_holes) {
return Status("--disk_spill_punch_holes must be true if disk spill compression "
"is enabled");
}
Status codec_parse_status = ParseUtil::ParseCompressionCodec(
compression_codec, &compression_codec_, &compression_level_);
if (!codec_parse_status.ok()) {
return Status(
Substitute("Could not parse --disk_spill_compression_codec value '$0': $1",
compression_codec, codec_parse_status.GetDetail()));
}
if (compression_enabled()) {
compressed_buffer_tracker_.reset(
new MemTracker(FLAGS_disk_spill_compression_buffer_limit_bytes,
"Spill-to-disk temporary compression buffers",
ExecEnv::GetInstance()->process_mem_tracker()));
}
}
bool is_percent;
tmp_dirs_remote_ctrl_.remote_tmp_file_size_ =
ParseUtil::ParseMemSpec(FLAGS_remote_tmp_file_size, &is_percent, 0);
if (tmp_dirs_remote_ctrl_.remote_tmp_file_size_ <= 0) {
return Status(Substitute(
"Invalid value of remote_tmp_file_size '$0'", FLAGS_remote_tmp_file_size));
}
if (tmp_dirs_remote_ctrl_.remote_tmp_file_size_
> MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB * 1024 * 1024) {
tmp_dirs_remote_ctrl_.remote_tmp_file_size_ =
MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB * 1024 * 1024;
}
tmp_dirs_remote_ctrl_.remote_tmp_block_size_ =
ParseUtil::ParseMemSpec(FLAGS_remote_tmp_file_block_size, &is_percent,
tmp_dirs_remote_ctrl_.remote_tmp_file_size_);
if (tmp_dirs_remote_ctrl_.remote_tmp_block_size_ <= 0) {
return Status(Substitute(
"Invalid value of remote_tmp_block_size '$0'", FLAGS_remote_tmp_file_block_size));
}
tmp_dirs_remote_ctrl_.wait_for_spill_buffer_timeout_us_ =
FLAGS_wait_for_spill_buffer_timeout_s * MICROS_PER_SEC;
if (tmp_dirs_remote_ctrl_.wait_for_spill_buffer_timeout_us_ <= 0) {
return Status(Substitute("Invalid value of wait_for_spill_buffer_timeout_us '$0'",
FLAGS_wait_for_spill_buffer_timeout_s));
}
tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = FLAGS_remote_batch_read;
if (tmp_dirs_remote_ctrl_.remote_batch_read_enabled_) {
Status setup_read_buffer_status = tmp_dirs_remote_ctrl_.SetUpReadBufferParams();
if (!setup_read_buffer_status.ok()) {
LOG(WARNING) << "Disabled the read buffer for the remote temporary files "
"due to errors in read buffer parameters: "
<< setup_read_buffer_status.msg().msg();
tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = false;
}
}
// Below options are using for test by setting different modes to implement the
// spilling to the remote.
tmp_dirs_remote_ctrl_.remote_tmp_files_avail_pool_lifo_ =
FLAGS_remote_tmp_files_avail_pool_lifo;
vector<std::unique_ptr<TmpDir>> tmp_dirs;
// need_local_buffer_dir indicates if currently we need to a directory in local scratch
// space for being the buffer of a remote directory.
bool need_local_buffer_dir = false;
// Parse the directory specifiers. Don't return an error on parse errors, just log a
// warning - we don't want to abort process startup because of misconfigured scratch,
// since queries will generally still be runnable.
for (const string& tmp_dir_spec : tmp_dir_specifiers) {
string tmp_dir_spec_trimmed(boost::algorithm::trim_copy(tmp_dir_spec));
std::unique_ptr<TmpDir> tmp_dir;
if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false)
|| IsOzonePath(tmp_dir_spec_trimmed.c_str(), false)) {
tmp_dir = std::make_unique<TmpDirHdfs>(tmp_dir_spec_trimmed);
} else if (IsS3APath(tmp_dir_spec_trimmed.c_str(), false)) {
// Initialize the S3 options for later getting S3 connection.
s3a_options_ = {make_pair("fs.s3a.fast.upload", "true"),
make_pair("fs.s3a.fast.upload.buffer", "disk")};
tmp_dir = std::make_unique<TmpDirS3>(tmp_dir_spec_trimmed);
} else if (IsGcsPath(tmp_dir_spec_trimmed.c_str(), false)) {
// TODO(IMPALA-10561): Add support for spilling to GCS
} else {
tmp_dir = std::make_unique<TmpDirLocal>(tmp_dir_spec_trimmed);
}
DCHECK(tmp_dir != nullptr);
Status parse_status = tmp_dir->Parse();
if (!parse_status.ok()) {
LOG(WARNING) << "Directory " << tmp_dir_spec.c_str() << " is not used because "
<< parse_status.msg().msg();
continue;
}
if (!tmp_dir->is_local()) {
// Set the flag to reserve a local dir for buffer.
// If the flag has been set, meaning that there is already one remote dir
// registered, since we only support one remote dir, this remote dir will be
// abandoned.
if (need_local_buffer_dir) {
LOG(WARNING) << "Only one remote directory is supported. Extra remote directory "
<< tmp_dir_spec.c_str() << " is not used.";
continue;
} else {
need_local_buffer_dir = true;
}
}
tmp_dirs.emplace_back(move(tmp_dir));
}
vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false);
// For each tmp directory, find the disk it is on,
// so additional tmp directories on the same disk can be skipped.
for (int i = 0; i < tmp_dirs.size(); ++i) {
Status status = tmp_dirs[i]->VerifyAndCreate(
metrics, &is_tmp_dir_on_disk, need_local_buffer_dir, this);
if (!status.ok()) {
// If the remote directory fails to verify or create, return the error.
if (!tmp_dirs[i]->is_local()) return status;
// If it is the local directory, continue to try next directory.
continue;
}
if (tmp_dirs[i]->is_local()) {
if (need_local_buffer_dir) {
local_buff_dir_ = move(tmp_dirs[i]);
need_local_buffer_dir = false;
} else {
tmp_dirs_.emplace_back(move(tmp_dirs[i]));
}
} else {
tmp_dirs_remote_ = move(tmp_dirs[i]);
}
}
// Sort the tmp directories by priority.
std::sort(tmp_dirs_.begin(), tmp_dirs_.end(),
[](const std::unique_ptr<TmpDir>& a, const std::unique_ptr<TmpDir>& b) {
return a->priority_ < b->priority_;
});
if (HasRemoteDir()) {
if (local_buff_dir_ == nullptr) {
// Should at least have one local dir for the buffer. Later we might allow to use
// s3 fast upload directly without a buffer.
return Status(
Substitute("No local directory configured for remote scratch space: $0",
tmp_dirs_remote_->path_));
} else {
LOG(INFO) << "Using scratch directory " << tmp_dirs_remote_->path_ << " limit: "
<< PrettyPrinter::PrintBytes(tmp_dirs_remote_->bytes_limit_);
IntGauge* bytes_used_metric = metrics->AddGauge(
SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs_.size()));
tmp_dirs_remote_->bytes_used_metric_ = bytes_used_metric;
}
}
DCHECK(metrics != nullptr);
num_active_scratch_dirs_metric_ =
metrics->AddGauge(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
if (HasRemoteDir()) {
num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size() + 1);
} else {
num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
}
for (int i = 0; i < tmp_dirs_.size(); ++i) {
active_scratch_dirs_metric_->Add(tmp_dirs_[i]->path_);
}
if (HasRemoteDir()) {
active_scratch_dirs_metric_->Add(tmp_dirs_remote_->path_);
RETURN_IF_ERROR(CreateTmpFileBufferPoolThread(metrics));
}
scratch_bytes_used_metric_ =
metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK,
TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED, 0);
scratch_read_memory_buffer_used_metric_ =
metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED_HIGH_WATER_MARK,
TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED, 0);
initialized_ = true;
if ((tmp_dirs_.empty() && local_buff_dir_ == nullptr) && !tmp_dirs.empty()) {
LOG(ERROR) << "Running without spill to disk: could not use any scratch "
<< "directories in list: " << join(tmp_dir_specifiers, ",")
<< ". See previous warnings for information on causes.";
}
return Status::OK();
}