be/src/runtime/tmp-file-mgr.cc (1,938 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "runtime/tmp-file-mgr.h" #include <limits> #include <mutex> #include <linux/falloc.h> #include <boost/algorithm/string.hpp> #include <boost/filesystem.hpp> #include <boost/lexical_cast.hpp> #include <boost/uuid/random_generator.hpp> #include <boost/uuid/uuid_io.hpp> #include <gutil/strings/join.h> #include <gutil/strings/substitute.h> #include "kudu/util/env.h" #include "runtime/bufferpool/buffer-pool-counters.h" #include "runtime/exec-env.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/io/error-converter.h" #include "runtime/io/local-file-writer.h" #include "runtime/io/request-context.h" #include "runtime/mem-tracker.h" #include "runtime/runtime-state.h" #include "runtime/tmp-file-mgr-internal.h" #include "util/bit-util.h" #include "util/codec.h" #include "util/collection-metrics.h" #include "util/debug-util.h" #include "util/disk-info.h" #include "util/filesystem-util.h" #include "util/hdfs-util.h" #include "util/histogram-metric.h" #include "util/kudu-status-util.h" #include "util/mem-info.h" #include "util/os-util.h" #include "util/parse-util.h" #include "util/pretty-printer.h" #include "util/runtime-profile-counters.h" #include "util/scope-exit-trigger.h" #include "util/string-parser.h" #include "common/names.h" DEFINE_bool(disk_spill_encryption, true, "Set this to encrypt and perform an integrity " "check on all data spilled to disk during a query"); DEFINE_string(disk_spill_compression_codec, "", "(Advanced) If set, data will be compressed using the specified compression codec " "before spilling to disk. This can substantially reduce scratch disk usage, at the " "cost of requiring more CPU and memory resources to compress the data. Uses the same " "syntax as the COMPRESSION_CODEC query option, e.g. 'lz4', 'zstd', 'zstd:6'. If " "this is set, then --disk_spill_punch_holes must be enabled."); DEFINE_int64(disk_spill_compression_buffer_limit_bytes, 512L * 1024L * 1024L, "(Advanced) Limit on the total bytes of compression buffers that will be used for " "spill-to-disk compression across all queries. If this limit is exceeded, some data " "may be spilled to disk in uncompressed form."); DEFINE_bool(disk_spill_punch_holes, false, "(Advanced) changes the free space management strategy for files created in " "--scratch_dirs to punch holes in the file when space is unused. This can reduce " "the amount of scratch space used by queries, particularly in conjunction with " "disk spill compression. This option requires the filesystems of the directories " "in --scratch_dirs to support hole punching."); DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories. " "This is a comma-separated list of directories. Each directory is " "specified as the directory path, an optional limit on the bytes that will " "be allocated in that directory, and an optional priority for the directory. " "If the optional limit is provided, the path and " "the limit are separated by a colon. E.g. '/dir1:10G,/dir2:5GB,/dir3' will allow " "allocating up to 10GB of scratch in /dir1, 5GB of scratch in /dir2 and an " "unlimited amount in /dir3. " "If the optional priority is provided, the path and the limit and priority are " "separated by colon. Priority based spilling will result in directories getting " "selected as a spill target based on their priority. The lower the numerical value " "the higher the priority. E.g. '/dir1:10G:0,/dir2:5GB:1,/dir3::1', will cause " "spilling to first fill up '/dir1' followed by using '/dir2' and '/dir3' in a " "round robin manner."); DEFINE_bool(allow_multiple_scratch_dirs_per_device, true, "If false and --scratch_dirs contains multiple directories on the same device, " "then only the first writable directory is used"); DEFINE_string(remote_tmp_file_size, "16M", "Specify the size of a remote temporary file. Upper bound is 256MB. Lower bound " "is the block size. The size should be power of 2 and integer times of the block " "size."); DEFINE_string(remote_tmp_file_block_size, "1M", "Specify the size of the block for doing file uploading and fetching. The block " "size should be power of 2 and less than the size of remote temporary file."); DEFINE_string(remote_read_memory_buffer_size, "1G", "Specify the maximum size of read memory buffers for the remote temporary " "files. Only valid when --remote_batch_read is true."); DEFINE_bool(remote_tmp_files_avail_pool_lifo, false, "If true, lifo is the algo to evict the local buffer files during spilling " "to the remote. Otherwise, fifo would be used."); DEFINE_int32(wait_for_spill_buffer_timeout_s, 60, "Specify the timeout duration waiting for the buffer to write (second). If a spilling" "opertion fails to get a buffer from the pool within the duration, the operation" "fails."); DEFINE_bool(remote_batch_read, false, "Set if the system uses batch reading for the remote temporary files. Batch reading" "allows reading a block asynchronously when the buffer pool is trying to pin one" "page of that block."); DEFINE_bool(remote_scratch_cleanup_on_start_stop, true, "If enabled, the Impala daemon will clean up the host-level directory within the " "specified remote scratch directory during both startup and shutdown to remove " "potential leftover files. This assumes a single Impala daemon per host. " "For multiple daemons on a host, set this to false to prevent unintended cleanup."); using boost::algorithm::is_any_of; using boost::algorithm::join; using boost::algorithm::split; using boost::algorithm::token_compress_off; using boost::algorithm::token_compress_on; using boost::filesystem::absolute; using boost::filesystem::path; using boost::uuids::random_generator; using namespace impala::io; using kudu::Env; using kudu::RWFile; using kudu::RWFileOptions; using namespace strings; namespace impala { constexpr int64_t TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES; const string TMP_SUB_DIR_NAME = "impala-scratch"; const uint64_t AVAILABLE_SPACE_THRESHOLD_MB = 1024; const uint64_t MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB = 512; // For spilling to remote fs, the max size of a read memory block. const uint64_t MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES = 16 * 1024 * 1024; // The memory limits for the memory buffer to read the spilled data in the remote fs. // The maximum bytes of the read buffer should be limited by the // REMOTE_READ_BUFFER_MAX_MEM_PERCENT, which stands for the percentage of the total // memory and REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT, which stands for the percentage // of the remaining memory which is not used by the process. // Also, if the remaining memory is less than REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT // of the total memory, then the read buffer for remote spilled data should be disabled. const double REMOTE_READ_BUFFER_MAX_MEM_PERCENT = 0.1; const double REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT = 0.5; const double REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT = 0.05; // Metric keys const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dirs"; const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST = "tmp-file-mgr.active-scratch-dirs.list"; const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK = "tmp-file-mgr.scratch-space-bytes-used-high-water-mark"; const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED = "tmp-file-mgr.scratch-space-bytes-used"; const string TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED_HIGH_WATER_MARK = "tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark"; const string TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED = "tmp-file-mgr.scratch-read-memory-buffer-used"; const string SCRATCH_DIR_BYTES_USED_FORMAT = "tmp-file-mgr.scratch-space-bytes-used.dir-$0"; const string LOCAL_BUFF_BYTES_USED_FORMAT = "tmp-file-mgr.local-buff-bytes-used.dir-$0"; const string TMP_FILE_BUFF_POOL_DEQUEUE_DURATIONS = "tmp-file-mgr.tmp-file-buff-pool-dequeue-durations"; static const Status& TMP_FILE_MGR_NO_AVAILABLE_FILE_TO_EVICT = Status(ErrorMsg::Init( TErrorCode::GENERAL, "TmpFileMgr::ReserveLocalBufferSpace() failed to find available files to evict.")); static const Status& TMP_FILE_BUFFER_POOL_CONTEXT_CANCELLED = Status::CancelledInternal("TmpFileBufferPool"); using DeviceId = TmpFileMgr::DeviceId; using WriteDoneCallback = TmpFileMgr::WriteDoneCallback; TmpFileMgr::TmpFileMgr() {} TmpFileMgr::~TmpFileMgr() { if(tmp_dirs_remote_ctrl_.tmp_file_pool_ != nullptr) { tmp_dirs_remote_ctrl_.tmp_file_pool_->ShutDown(); tmp_dirs_remote_ctrl_.tmp_file_mgr_thread_group_.JoinAll(); } } Status TmpFileMgr::Init(MetricGroup* metrics) { return InitCustom(FLAGS_scratch_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, FLAGS_disk_spill_compression_codec, FLAGS_disk_spill_punch_holes, metrics); } Status TmpFileMgr::InitCustom(const string& tmp_dirs_spec, bool one_dir_per_device, const string& compression_codec, bool punch_holes, MetricGroup* metrics) { vector<string> all_tmp_dirs; // Empty string should be interpreted as no scratch if (!tmp_dirs_spec.empty()) { split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on); } return InitCustom( all_tmp_dirs, one_dir_per_device, compression_codec, punch_holes, metrics); } Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers, bool one_dir_per_device, const string& compression_codec, bool punch_holes, MetricGroup* metrics) { DCHECK(!initialized_); punch_holes_ = punch_holes; one_dir_per_device_ = one_dir_per_device; if (tmp_dir_specifiers.empty()) { LOG(WARNING) << "Running without spill to disk: no scratch directories provided."; } if (!compression_codec.empty()) { if (!punch_holes) { return Status("--disk_spill_punch_holes must be true if disk spill compression " "is enabled"); } Status codec_parse_status = ParseUtil::ParseCompressionCodec( compression_codec, &compression_codec_, &compression_level_); if (!codec_parse_status.ok()) { return Status( Substitute("Could not parse --disk_spill_compression_codec value '$0': $1", compression_codec, codec_parse_status.GetDetail())); } if (compression_enabled()) { compressed_buffer_tracker_.reset( new MemTracker(FLAGS_disk_spill_compression_buffer_limit_bytes, "Spill-to-disk temporary compression buffers", ExecEnv::GetInstance()->process_mem_tracker())); } } bool is_percent; tmp_dirs_remote_ctrl_.remote_tmp_file_size_ = ParseUtil::ParseMemSpec(FLAGS_remote_tmp_file_size, &is_percent, 0); if (tmp_dirs_remote_ctrl_.remote_tmp_file_size_ <= 0) { return Status(Substitute( "Invalid value of remote_tmp_file_size '$0'", FLAGS_remote_tmp_file_size)); } if (tmp_dirs_remote_ctrl_.remote_tmp_file_size_ > MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB * 1024 * 1024) { tmp_dirs_remote_ctrl_.remote_tmp_file_size_ = MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB * 1024 * 1024; } tmp_dirs_remote_ctrl_.remote_tmp_block_size_ = ParseUtil::ParseMemSpec(FLAGS_remote_tmp_file_block_size, &is_percent, tmp_dirs_remote_ctrl_.remote_tmp_file_size_); if (tmp_dirs_remote_ctrl_.remote_tmp_block_size_ <= 0) { return Status(Substitute( "Invalid value of remote_tmp_block_size '$0'", FLAGS_remote_tmp_file_block_size)); } tmp_dirs_remote_ctrl_.wait_for_spill_buffer_timeout_us_ = FLAGS_wait_for_spill_buffer_timeout_s * MICROS_PER_SEC; if (tmp_dirs_remote_ctrl_.wait_for_spill_buffer_timeout_us_ <= 0) { return Status(Substitute("Invalid value of wait_for_spill_buffer_timeout_us '$0'", FLAGS_wait_for_spill_buffer_timeout_s)); } tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = FLAGS_remote_batch_read; if (tmp_dirs_remote_ctrl_.remote_batch_read_enabled_) { Status setup_read_buffer_status = tmp_dirs_remote_ctrl_.SetUpReadBufferParams(); if (!setup_read_buffer_status.ok()) { LOG(WARNING) << "Disabled the read buffer for the remote temporary files " "due to errors in read buffer parameters: " << setup_read_buffer_status.msg().msg(); tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = false; } } // Below options are using for test by setting different modes to implement the // spilling to the remote. tmp_dirs_remote_ctrl_.remote_tmp_files_avail_pool_lifo_ = FLAGS_remote_tmp_files_avail_pool_lifo; vector<std::unique_ptr<TmpDir>> tmp_dirs; // need_local_buffer_dir indicates if currently we need to a directory in local scratch // space for being the buffer of a remote directory. bool need_local_buffer_dir = false; // Parse the directory specifiers. Don't return an error on parse errors, just log a // warning - we don't want to abort process startup because of misconfigured scratch, // since queries will generally still be runnable. for (const string& tmp_dir_spec : tmp_dir_specifiers) { string tmp_dir_spec_trimmed(boost::algorithm::trim_copy(tmp_dir_spec)); std::unique_ptr<TmpDir> tmp_dir; if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false) || IsOzonePath(tmp_dir_spec_trimmed.c_str(), false)) { tmp_dir = std::make_unique<TmpDirHdfs>(tmp_dir_spec_trimmed); } else if (IsS3APath(tmp_dir_spec_trimmed.c_str(), false)) { // Initialize the S3 options for later getting S3 connection. s3a_options_ = {make_pair("fs.s3a.fast.upload", "true"), make_pair("fs.s3a.fast.upload.buffer", "disk")}; tmp_dir = std::make_unique<TmpDirS3>(tmp_dir_spec_trimmed); } else if (IsGcsPath(tmp_dir_spec_trimmed.c_str(), false)) { // TODO(IMPALA-10561): Add support for spilling to GCS } else { tmp_dir = std::make_unique<TmpDirLocal>(tmp_dir_spec_trimmed); } DCHECK(tmp_dir != nullptr); Status parse_status = tmp_dir->Parse(); if (!parse_status.ok()) { LOG(WARNING) << "Directory " << tmp_dir_spec.c_str() << " is not used because " << parse_status.msg().msg(); continue; } if (!tmp_dir->is_local()) { // Set the flag to reserve a local dir for buffer. // If the flag has been set, meaning that there is already one remote dir // registered, since we only support one remote dir, this remote dir will be // abandoned. if (need_local_buffer_dir) { LOG(WARNING) << "Only one remote directory is supported. Extra remote directory " << tmp_dir_spec.c_str() << " is not used."; continue; } else { need_local_buffer_dir = true; } } tmp_dirs.emplace_back(move(tmp_dir)); } vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false); // For each tmp directory, find the disk it is on, // so additional tmp directories on the same disk can be skipped. for (int i = 0; i < tmp_dirs.size(); ++i) { Status status = tmp_dirs[i]->VerifyAndCreate( metrics, &is_tmp_dir_on_disk, need_local_buffer_dir, this); if (!status.ok()) { // If the remote directory fails to verify or create, return the error. if (!tmp_dirs[i]->is_local()) return status; // If it is the local directory, continue to try next directory. continue; } if (tmp_dirs[i]->is_local()) { if (need_local_buffer_dir) { local_buff_dir_ = move(tmp_dirs[i]); need_local_buffer_dir = false; } else { tmp_dirs_.emplace_back(move(tmp_dirs[i])); } } else { tmp_dirs_remote_ = move(tmp_dirs[i]); } } // Sort the tmp directories by priority. std::sort(tmp_dirs_.begin(), tmp_dirs_.end(), [](const std::unique_ptr<TmpDir>& a, const std::unique_ptr<TmpDir>& b) { return a->priority_ < b->priority_; }); if (HasRemoteDir()) { if (local_buff_dir_ == nullptr) { // Should at least have one local dir for the buffer. Later we might allow to use // s3 fast upload directly without a buffer. return Status( Substitute("No local directory configured for remote scratch space: $0", tmp_dirs_remote_->path_)); } else { LOG(INFO) << "Using scratch directory " << tmp_dirs_remote_->path_ << " limit: " << PrettyPrinter::PrintBytes(tmp_dirs_remote_->bytes_limit_); IntGauge* bytes_used_metric = metrics->AddGauge( SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs_.size())); tmp_dirs_remote_->bytes_used_metric_ = bytes_used_metric; } } DCHECK(metrics != nullptr); num_active_scratch_dirs_metric_ = metrics->AddGauge(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0); active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister( metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>()); if (HasRemoteDir()) { num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size() + 1); } else { num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size()); } for (int i = 0; i < tmp_dirs_.size(); ++i) { active_scratch_dirs_metric_->Add(tmp_dirs_[i]->path_); } if (HasRemoteDir()) { active_scratch_dirs_metric_->Add(tmp_dirs_remote_->path_); RETURN_IF_ERROR(CreateTmpFileBufferPoolThread(metrics)); } scratch_bytes_used_metric_ = metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK, TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED, 0); scratch_read_memory_buffer_used_metric_ = metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED_HIGH_WATER_MARK, TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED, 0); initialized_ = true; if ((tmp_dirs_.empty() && local_buff_dir_ == nullptr) && !tmp_dirs.empty()) { LOG(ERROR) << "Running without spill to disk: could not use any scratch " << "directories in list: " << join(tmp_dir_specifiers, ",") << ". See previous warnings for information on causes."; } return Status::OK(); } void TmpFileMgr::CleanupAtShutdown() { // Try to clear the host-level remote temporary directory. if (tmp_dirs_remote_ == nullptr) return; hdfsFS hdfs_conn; Status status = tmp_dirs_remote_->GetConnection(this, &hdfs_conn); if (!status.ok()) { LOG(WARNING) << "Unable to get a connection to " << tmp_dirs_remote_->path_ << " for clearing the directory on shutdown"; return; } RemoveRemoteDirForHost(tmp_dirs_remote_->path_, hdfs_conn); } Status TmpFileMgr::CreateTmpFileBufferPoolThread(MetricGroup* metrics) { DCHECK(metrics != nullptr); tmp_dirs_remote_ctrl_.tmp_file_pool_.reset(new TmpFileBufferPool(this)); std::unique_ptr<Thread> t; RETURN_IF_ERROR(Thread::Create("tmp-file-pool", "work-loop(TmpFileSpaceReserve Worker)", &TmpFileBufferPool::TmpFileSpaceReserveThreadLoop, tmp_dirs_remote_ctrl_.tmp_file_pool_.get(), &t)); tmp_dirs_remote_ctrl_.tmp_file_mgr_thread_group_.AddThread(move(t)); int64_t ONE_HOUR_IN_NS = 60L * 60L * NANOS_PER_SEC; tmp_dirs_remote_ctrl_.tmp_file_pool_->dequeue_timer_metric_ = metrics->RegisterMetric(new HistogramMetric( MetricDefs::Get(TMP_FILE_BUFF_POOL_DEQUEUE_DURATIONS), ONE_HOUR_IN_NS, 3)); return Status::OK(); } void TmpFileMgr::NewFile( TmpFileGroup* file_group, DeviceId device_id, unique_ptr<TmpFile>* new_file) { DCHECK(initialized_); DCHECK_GE(device_id, 0); DCHECK_LT(device_id, tmp_dirs_.size()); DCHECK(file_group != nullptr); // Generate the full file path. string unique_name = lexical_cast<string>(random_generator()()); stringstream file_name; file_name << PrintId(file_group->unique_id()) << "_" << unique_name; path new_file_path(tmp_dirs_[device_id]->path_); new_file_path /= file_name.str(); new_file->reset(new TmpFileLocal(file_group, device_id, new_file_path.string())); } static string ConstructRemoteDirPath(const string& base_dir, const string& hostname, const string& backend_id = "", const string& query_id = "") { stringstream dir; dir << base_dir << "/" << hostname; if (!backend_id.empty()) { dir << "/" << backend_id; DCHECK(!query_id.empty()); dir << "_" << query_id; } return dir.str(); } void TmpFileMgr::RemoveRemoteDirForHost(const string& dir, hdfsFS hdfs_conn) { if (!FLAGS_remote_scratch_cleanup_on_start_stop) return; DCHECK(hdfs_conn != nullptr); const string hostlevel_dir = ConstructRemoteDirPath( dir, ExecEnv::GetInstance()->configured_backend_address().hostname); if (hdfsExists(hdfs_conn, hostlevel_dir.c_str()) == 0) { hdfsDelete(hdfs_conn, hostlevel_dir.c_str(), 1); LOG(INFO) << "Called to remove the host-level remote directory " << hostlevel_dir; } } void TmpFileMgr::RemoveRemoteDirForQuery(TmpFileGroup* file_group) { if (tmp_dirs_remote_ == nullptr) return; const string& dir = tmp_dirs_remote_->path_; const string& hostname = ExecEnv::GetInstance()->configured_backend_address().hostname; const string backend_id = PrintId(ExecEnv::GetInstance()->backend_id(), "_"); const string query_id = PrintId(file_group->unique_id(), "_"); const string querylevel_dir = ConstructRemoteDirPath(dir, hostname, backend_id, query_id); hdfsFS hdfs_conn; Status status = HdfsFsCache::instance()->GetConnection(querylevel_dir, &hdfs_conn, &hdfs_conns_); if (status.ok()) { DCHECK(hdfs_conn != nullptr); hdfsDelete(hdfs_conn, querylevel_dir.c_str(), 1); LOG(INFO) << "Called to remove the query-level remote directory " << querylevel_dir; } else { LOG(WARNING) << "Failed to remove the remote directory because unable to create a " "connection to " << querylevel_dir; } } Status TmpFileMgr::AsyncWriteRange(WriteRange* write_range, TmpFile* tmp_file) { if (write_range->disk_file()->disk_type() == io::DiskFileType::LOCAL) { DCHECK(write_range != nullptr); DCHECK(write_range->io_ctx() != nullptr); return write_range->io_ctx()->AddWriteRange(write_range); } // If spill to a remote directory, TmpFileBufferPool is helping to send the writes to // the DiskQueue because the local buffer for the remote file may be used up and it may // need to wait in the pool before the writes can be sent to the DiskQueue. DCHECK(tmp_dirs_remote_ctrl_.tmp_file_pool_ != nullptr); DCHECK(tmp_file != nullptr); return tmp_dirs_remote_ctrl_.tmp_file_pool_->EnqueueWriteRange(write_range, tmp_file); } void TmpFileMgr::EnqueueTmpFilesPoolDummyFile() { EnqueueTmpFilesPool(tmp_dirs_remote_ctrl_.tmp_file_pool_->tmp_file_dummy_, true); } void TmpFileMgr::EnqueueTmpFilesPool(shared_ptr<TmpFile>& tmp_file, bool front) { tmp_dirs_remote_ctrl_.tmp_file_pool_->EnqueueTmpFilesPool(tmp_file, front); } Status TmpFileMgr::DequeueTmpFilesPool(shared_ptr<TmpFile>* tmp_file, bool quick_return) { return tmp_dirs_remote_ctrl_.tmp_file_pool_->DequeueTmpFilesPool( tmp_file, quick_return); } void TmpFileMgr::ReleaseTmpFileReadBuffer( const unique_lock<shared_mutex>& file_lock, TmpFile* tmp_file) { DCHECK(tmp_file != nullptr); DCHECK(IsRemoteBatchReadingEnabled()); TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file); for (int i = 0; i < GetNumReadBuffersPerFile(); i++) { tmp_file_remote->TryDeleteReadBuffer(file_lock, i); } } Status TmpFileMgr::TryEvictFile(TmpFile* tmp_file) { DCHECK(tmp_file != nullptr); if (tmp_file->disk_type() == io::DiskFileType::DUMMY) return Status::OK(); TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file); DiskFile* buffer_file = tmp_file_remote->DiskBufferFile(); // Remove the buffer of the TmpFile. // After deletion of the buffer, if the TmpFile doesn't exist in the remote file system // either, that means the TmpFile shared pointer can be removed from the TmpFileMgr, // because in this case, the physical file is considered no longer in the system. // Hold the unique locks of the files during the deletion. Status status = Status::OK(); { unique_lock<shared_mutex> buffer_lock(buffer_file->physical_file_lock_); if (buffer_file->GetFileStatus() == io::DiskFileStatus::PERSISTED) { status = buffer_file->Delete(buffer_lock); } } return status; } Status TmpFileMgr::ReserveLocalBufferSpace(bool quick_return) { int64_t file_size = GetRemoteTmpFileSize(); // The high water mark is used to record the total bytes which have been assigned, we // can assume that all the assigned bytes will be finally returned to the pool. // Before the high water mark reaches the bytes limit of the local buffer directory, // the caller can gain space freely. But if the high water mark is over the bytes limit, // the caller needs to gain space from the pool because all the available spaces are in // the pool now. TmpDir* dir = local_buff_dir_.get(); if (tmp_dirs_remote_ctrl_.local_buff_dir_bytes_high_water_mark_.Add(file_size) > dir->bytes_limit_) { tmp_dirs_remote_ctrl_.local_buff_dir_bytes_high_water_mark_.Add(-file_size); } else { GetLocalBufferDir()->bytes_used_metric_->Increment(file_size); return Status::OK(); } shared_ptr<TmpFile> tmp_file; // If all of the space of the buffer directory has been assigned, gain a file which // is available to be evicted from the TmpFileBufferPool. It can be a long wait if // quick return is not set and there is no available file in the pool. Status status = DequeueTmpFilesPool(&tmp_file, quick_return); if (!status.ok()) { DCHECK(tmp_file == nullptr); return status; } // Evict the file to release the physical space. // If error happens during eviction, we log an warning, and return status ok instead to // keep the caller doing the writing since probably the physical file is already // deleted. status = TryEvictFile(tmp_file.get()); if (!status.ok()) { LOG(WARNING) << "File Eviction Failed: " << tmp_file->GetWriteFile()->path(); } return Status::OK(); } TmpDir* TmpFileMgr::GetLocalBufferDir() const { return local_buff_dir_.get(); } int TmpFileMgr::NumActiveTmpDevicesLocal() { DCHECK(initialized_); return tmp_dirs_.size(); } int TmpFileMgr::NumActiveTmpDevices() { DCHECK(initialized_); return tmp_dirs_.size() + ((tmp_dirs_remote_ == nullptr) ? 0 : 1); } vector<DeviceId> TmpFileMgr::ActiveTmpDevices() { vector<DeviceId> devices; DeviceId device_id = 0; for (; device_id < tmp_dirs_.size(); ++device_id) { devices.push_back(device_id); } if (tmp_dirs_remote_ != nullptr) { devices.push_back(device_id); } return devices; } string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const { DCHECK(initialized_); DCHECK_GE(device_id, 0); DCHECK_LT(device_id, tmp_dirs_.size() + ((tmp_dirs_remote_ == nullptr) ? 0 : 1)); if (device_id < tmp_dirs_.size()) { return tmp_dirs_[device_id]->path_; } else { return tmp_dirs_remote_->path_; } } int64_t TmpFileMgr::TmpDirRemoteCtrl::CalcMaxReadBufferBytes() { int64_t max_allow_bytes = 0; int64_t process_bytes_limit; int64_t total_avail_mem; if (!ChooseProcessMemLimit(&process_bytes_limit, &total_avail_mem).ok()) { // Return 0 to disable read buffer if unable to get the process and system limit. return max_allow_bytes; } DCHECK_GE(total_avail_mem, process_bytes_limit); // Only allows the read buffer if the memory not being used is larger than // REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT of the total memory. if ((total_avail_mem - process_bytes_limit) > total_avail_mem * REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT) { // Max allowed bytes are the minimum of REMOTE_READ_BUFFER_MAX_MEM_PERCENT of the // total memory and REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT of the unused memory. max_allow_bytes = min((total_avail_mem - process_bytes_limit) * REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT, total_avail_mem * REMOTE_READ_BUFFER_MAX_MEM_PERCENT); } return max_allow_bytes; } Status TmpFileMgr::TmpDirRemoteCtrl::SetUpReadBufferParams() { bool is_percent; // If the temporary file size is smaller than the max block size, set the block size // as the file size read_buffer_block_size_ = remote_tmp_file_size_ < MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES ? remote_tmp_file_size_ : MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES; num_read_buffer_blocks_per_file_ = static_cast<int>(remote_tmp_file_size_ / read_buffer_block_size_); max_read_buffer_size_ = ParseUtil::ParseMemSpec(FLAGS_remote_read_memory_buffer_size, &is_percent, 0); if (max_read_buffer_size_ <= 0) { return Status(Substitute("Invalid value of remote_read_memory_buffer_size '$0'", FLAGS_remote_read_memory_buffer_size)); } // Calculate the max allowed bytes for the read buffer. int64_t max_allow_bytes = CalcMaxReadBufferBytes(); DCHECK_GE(max_allow_bytes, 0); if (max_read_buffer_size_ > max_allow_bytes) { max_read_buffer_size_ = max_allow_bytes; LOG(WARNING) << "The remote read memory buffer size exceeds the maximum " "allowed and is reduced to " << max_allow_bytes << " bytes."; } LOG(INFO) << "Using " << max_read_buffer_size_ << " bytes for the batch reading buffer of TmpFileMgr."; return Status::OK(); } Status TmpDir::ParseByteLimit(const string& byte_limit) { bool is_percent; bytes_limit_ = ParseUtil::ParseMemSpec(byte_limit, &is_percent, 0); if (bytes_limit_ < 0 || is_percent) { return Status(Substitute( "Malformed scratch directory capacity configuration '$0'", raw_path_)); } else if (bytes_limit_ == 0) { // Interpret -1, 0 or empty string as no limit. bytes_limit_ = numeric_limits<int64_t>::max(); } return Status::OK(); } Status TmpDir::ParsePriority(const string& priority) { if (!priority.empty()) { StringParser::ParseResult result; priority_ = StringParser::StringToInt<int>( priority.c_str(), priority.size(), &result); if (result != StringParser::PARSE_SUCCESS) { return Status(Substitute( "Malformed scratch directory priority configuration '$0'", raw_path_)); } } return Status::OK(); } Status TmpDir::Parse() { DCHECK(parsed_raw_path_.empty() && path_.empty()); vector<string> toks; RETURN_IF_ERROR(ParsePathTokens(toks)); constexpr int max_num_tokens = 3; if (toks.size() > max_num_tokens) { return Status(Substitute( "Could not parse temporary dir specifier, too many colons: '$0'", raw_path_)); } // Construct the complete scratch directory path. toks[0] = trim_right_copy_if(toks[0], is_any_of("/")); parsed_raw_path_ = toks[0]; path_ = (boost::filesystem::path(toks[0]) / TMP_SUB_DIR_NAME).string(); // The scratch path may have two options "bytes limit" and "priority". // The bytes limit should be the first option. if (toks.size() > 1) { RETURN_IF_ERROR(ParseByteLimit(toks[1])); } // The priority should be the second option. if (toks.size() > 2) { RETURN_IF_ERROR(ParsePriority(toks[2])); } return Status::OK(); } Status TmpDirLocal::ParsePathTokens(vector<string>& toks) { // The ordinary format of the directory input after split by colon is // {path, [bytes_limit, [priority]]}. split(toks, raw_path_, is_any_of(":"), token_compress_off); toks[0] = absolute(toks[0]).string(); return Status::OK(); } Status TmpDirLocal::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { DCHECK(!parsed_raw_path_.empty()); // The path must be a writable directory. Status status = FileSystemUtil::VerifyIsDirectory(parsed_raw_path_); if (!status.ok()) { LOG(WARNING) << "Cannot use directory " << parsed_raw_path_ << " for scratch: " << status.msg().msg(); return status; } // Find the disk id of path. Add the scratch directory if there isn't another directory // on the same disk (or if we don't know which disk it is on). int disk_id = DiskInfo::disk_id(parsed_raw_path_.c_str()); if (!tmp_mgr->one_dir_per_device_ || disk_id < 0 || !(*is_tmp_dir_on_disk)[disk_id]) { uint64_t available_space; RETURN_IF_ERROR( FileSystemUtil::GetSpaceAvailable(parsed_raw_path_, &available_space)); if (available_space < AVAILABLE_SPACE_THRESHOLD_MB * 1024 * 1024) { LOG(WARNING) << "Filesystem containing scratch directory " << parsed_raw_path_ << " has less than " << AVAILABLE_SPACE_THRESHOLD_MB << "MB available."; } RETURN_IF_ERROR(CreateLocalDirectory( metrics, is_tmp_dir_on_disk, need_local_buffer_dir, disk_id, tmp_mgr)); if (tmp_mgr->punch_holes_) { // Make sure hole punching is supported for the directory. // IMPALA-9798: this file should *not* be created inside impala-scratch // subdirectory to avoid races with multiple impalads starting up. RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(parsed_raw_path_)); } } else { return Status(Substitute( "The scratch directory $0 is on the same disk with another directory or on " "an unknown disk.", parsed_raw_path_)); } return Status::OK(); } void TmpDirLocal::LogScratchLocalDirectoryInfo(bool is_local_buffer_dir, int disk_id) { LOG(INFO) << (is_local_buffer_dir ? "Using local buffer directory for scratch space " : "Using scratch directory ") << path_ << " on " << "disk " << disk_id << " limit: " << PrettyPrinter::PrintBytes(bytes_limit_) << ", priority: " << priority_; } Status TmpDirLocal::CreateLocalDirectory(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, int disk_id, TmpFileMgr* tmp_mgr) { DCHECK(!path_.empty()); // Create the directory, destroying if already present. If this succeeds, we will // have an empty writable scratch directory. Status status = FileSystemUtil::RemoveAndCreateDirectory(path_); if (status.ok()) { if (need_local_buffer_dir) { // Add the first local dir as local buffer, the dir is only served as the buffer // for spill to remote filesystem. At least we need the dir to have two default // file size space. if (bytes_limit_ < tmp_mgr->tmp_dirs_remote_ctrl_.remote_tmp_file_size_ * 2) { return Status(Substitute( "Local buffer directory $0 configured for remote scratch " "space has a size limit of $1 bytes, should be at least twice as the " "temporary file size " "$2 bytes", path_, bytes_limit_, tmp_mgr->tmp_dirs_remote_ctrl_.remote_tmp_file_size_)); } bytes_used_metric_ = metrics->AddGauge(LOCAL_BUFF_BYTES_USED_FORMAT, 0, Substitute("$0", 0)); LogScratchLocalDirectoryInfo(true /*is_local_buffer_dir*/, disk_id); return Status::OK(); } if (disk_id >= 0) (*is_tmp_dir_on_disk)[disk_id] = true; LogScratchLocalDirectoryInfo(false /*is_local_buffer_dir*/, disk_id); bytes_used_metric_ = metrics->AddGauge( SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_mgr->tmp_dirs_.size())); } else { LOG(WARNING) << "Could not remove and recreate directory " << path_ << ": cannot use it for scratch. " << "Error was: " << status.msg().msg(); } return status; } Status TmpDirS3::ParsePathTokens(vector<string>& toks) { // The ordinary format of the directory input after split by colon is // {scheme, path, [bytes_limit, [priority]]}. Combine scheme and path. split(toks, raw_path_, is_any_of(":"), token_compress_off); // Only called on paths starting with `s3a://`, so there will always be at least 2. DCHECK(toks.size() >= 2); toks[0] = Substitute("$0:$1", toks[0], toks[1]); toks.erase(toks.begin()+1); return Status::OK(); } Status TmpDirS3::GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* hdfs_conn) { DCHECK(tmp_mgr != nullptr); DCHECK(!path_.empty()); DCHECK(hdfs_conn != nullptr); return HdfsFsCache::instance()->GetConnection( path_, hdfs_conn, &(tmp_mgr->hdfs_conns_), tmp_mgr->s3a_options()); } Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { // For the S3 path, it doesn't need to create the directory for the uploading // as long as the S3 address is correct. DCHECK(tmp_mgr != nullptr); DCHECK(!path_.empty()); hdfsFS hdfs_conn; RETURN_IF_ERROR(GetConnection(tmp_mgr, &hdfs_conn)); tmp_mgr->RemoveRemoteDirForHost(path_, hdfs_conn); return Status::OK(); } Status TmpDirHdfs::ParsePathTokens(vector<string>& toks) { // HDFS scratch path can include an optional port number; URI without path and port // number is ambiguous so in that case we error. Format after split by colon is // {scheme, path, port_num?, [bytes_limit, [priority]]}. Coalesce the URI from tokens. split(toks, raw_path_, is_any_of(":"), token_compress_off); // Only called on paths starting with `hdfs://` or `ofs://`. DCHECK(toks.size() >= 2); if (toks[1].rfind('/') > 1) { // Contains a slash after the scheme, so port number was omitted. toks[0] = Substitute("$0:$1", toks[0], toks[1]); toks.erase(toks.begin()+1); } else if (toks.size() < 3) { return Status( Substitute("The scratch URI must have a path or port number: '$0'", raw_path_)); } else { toks[0] = Substitute("$0:$1:$2", toks[0], toks[1], toks[2]); toks.erase(toks.begin()+1, toks.begin()+3); } return Status::OK(); } Status TmpDirHdfs::GetConnection(TmpFileMgr* tmp_mgr, hdfsFS* hdfs_conn) { DCHECK(tmp_mgr != nullptr); DCHECK(!path_.empty()); DCHECK(hdfs_conn != nullptr); return HdfsFsCache::instance()->GetConnection( path_, hdfs_conn, &(tmp_mgr->hdfs_conns_)); } Status TmpDirHdfs::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_on_disk, bool need_local_buffer_dir, TmpFileMgr* tmp_mgr) { DCHECK(!path_.empty()); hdfsFS hdfs_conn; RETURN_IF_ERROR(GetConnection(tmp_mgr, &hdfs_conn)); if (hdfsExists(hdfs_conn, path_.c_str()) != 0) { // If the impala scratch path in hdfs doesn't exist, attempt to create the path to // verify it's valid and writable for scratch usage. // Failure may indicate a permission or configuration issue. if (hdfsCreateDirectory(hdfs_conn, path_.c_str()) != 0) { return Status(GetHdfsErrorMsg("HDFS create path failed: ", path_)); } } else { tmp_mgr->RemoveRemoteDirForHost(path_, hdfs_conn); } return Status::OK(); } TmpFile::TmpFile( TmpFileGroup* file_group, DeviceId device_id, const string& path, bool expected_local) : file_group_(file_group), path_(path), device_id_(device_id), disk_id_(DiskInfo::disk_id(path.c_str())), expected_local_(expected_local), blacklisted_(false) {} int TmpFile::AssignDiskQueue(bool is_local_buffer) const { // The file paths of TmpFiles are absolute paths, doesn't support default fs. if (is_local_buffer) { // Assign a disk queue for a local buffer, which is associated with a remote file. return file_group_->io_mgr_->AssignQueue(local_buffer_path_.c_str(), /* disk_id */ -1, /* expected_local */ true, /* check_default_fs */ false); } return file_group_->io_mgr_->AssignQueue( path_.c_str(), disk_id_, expected_local_, /* check_default_fs */ false); } bool TmpFile::Blacklist(const ErrorMsg& msg) { LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg(); if (!blacklisted_) { blacklisted_ = true; return true; } else { return false; } } TmpDir* TmpFile::GetDir() { auto tmp_file_mgr = file_group_->tmp_file_mgr_; if (device_id_ >= tmp_file_mgr->tmp_dirs_.size()) { // Only one remote directory supported. DCHECK(device_id_ - tmp_file_mgr->tmp_dirs_.size() == 0); return tmp_file_mgr->tmp_dirs_remote_.get(); } return tmp_file_mgr->tmp_dirs_[device_id_].get(); } Status TmpFile::PunchHole(int64_t offset, int64_t len) { DCHECK(file_group_->tmp_file_mgr_->punch_holes()); // Because of RAII, the file is automatically closed when this function returns. RWFileOptions opts; opts.mode = Env::CREATE_OR_OPEN; unique_ptr<RWFile> file; KUDU_RETURN_IF_ERROR(Env::Default()->NewRWFile(opts, path_, &file), "Failed to open scratch file for hole punching"); KUDU_RETURN_IF_ERROR( file->PunchHole(offset, len), "Failed to punch hole in scratch file"); bytes_reclaimed_.Add(len); GetDir()->bytes_used_metric()->Increment(-len); VLOG(3) << "Punched hole in " << path_ << " " << offset << " " << len; return Status::OK(); } string TmpFile::DebugString() { return Substitute( "File $0 path '$1' device id $2 disk id $3 allocation offset $4 blacklisted $5", this, path_, device_id_, disk_id_, allocation_offset_, blacklisted_); } TmpFileLocal::TmpFileLocal(TmpFileGroup* file_group, TmpFileMgr::DeviceId device_id, const std::string& path, bool expected_local) : TmpFile(file_group, device_id, path, expected_local) { DCHECK(file_group != nullptr); disk_file_ = make_unique<io::DiskFile>(path_, file_group->io_mgr_); disk_type_ = io::DiskFileType::LOCAL; } bool TmpFileLocal::AllocateSpace(int64_t num_bytes, int64_t* offset) { DCHECK_GT(num_bytes, 0); TmpDir* dir = GetDir(); // Increment optimistically and roll back if the limit is exceeded. if (dir->bytes_used_metric()->Increment(num_bytes) > dir->bytes_limit()) { dir->bytes_used_metric()->Increment(-num_bytes); return false; } *offset = allocation_offset_; allocation_offset_ += num_bytes; return true; } io::DiskFile* TmpFileLocal::GetWriteFile() { return disk_file_.get(); } Status TmpFileLocal::Remove() { // Remove the file if present (it may not be present if no writes completed). Status status = FileSystemUtil::RemovePaths({path_}); int64_t bytes_in_use = file_group_->tmp_file_mgr_->punch_holes() ? allocation_offset_ - bytes_reclaimed_.Load() : allocation_offset_; GetDir()->bytes_used_metric()->Increment(-bytes_in_use); return status; } TmpFileRemote::TmpFileRemote(TmpFileGroup* file_group, TmpFileMgr::DeviceId device_id, const std::string& path, const std::string& local_buffer_path, bool expected_local, const char* hdfs_url) : TmpFile(file_group, device_id, path, expected_local) { DCHECK(hdfs_url != nullptr); hdfs_conn_ = nullptr; const HdfsFsCache::HdfsConnOptions* options = nullptr; if (IsHdfsPath(hdfs_url, false)) { disk_type_ = io::DiskFileType::DFS; disk_id_ = file_group->io_mgr_->RemoteDfsDiskId(); disk_id_file_op_ = file_group->io_mgr_->RemoteDfsDiskFileOperId(); } else if (IsOzonePath(hdfs_url, false)) { disk_type_ = io::DiskFileType::DFS; disk_id_ = file_group->io_mgr_->RemoteOzoneDiskId(); disk_id_file_op_ = file_group->io_mgr_->RemoteDfsDiskFileOperId(); } else if (IsS3APath(hdfs_url, false)) { disk_type_ = io::DiskFileType::S3; disk_id_ = file_group->io_mgr_->RemoteS3DiskId(); disk_id_file_op_ = file_group->io_mgr_->RemoteS3DiskFileOperId(); options = file_group_->tmp_file_mgr_->s3a_options(); } Status status = HdfsFsCache::instance()->GetConnection( hdfs_url, &hdfs_conn_, &file_group_->tmp_file_mgr_->hdfs_conns_, options); file_size_ = file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(); local_buffer_path_ = local_buffer_path; disk_file_ = make_unique<io::DiskFile>(path_, file_group->io_mgr_, file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(), disk_type_, &hdfs_conn_); if (file_group_->tmp_file_mgr_->IsRemoteBatchReadingEnabled()) { read_buffer_block_size_ = file_group_->tmp_file_mgr_->GetReadBufferBlockSize(); int num_of_read_buffers = file_group_->tmp_file_mgr_->GetNumReadBuffersPerFile(); disk_buffer_file_ = make_unique<io::DiskFile>(local_buffer_path_, file_group_->io_mgr_, file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(), io::DiskFileType::LOCAL_BUFFER, read_buffer_block_size_, num_of_read_buffers); disk_read_page_cnts_ = std::make_unique<int64_t[]>(num_of_read_buffers); DCHECK(disk_read_page_cnts_.get() != nullptr); memset(disk_read_page_cnts_.get(), 0, num_of_read_buffers * sizeof(int64_t)); for (int i = 0; i < num_of_read_buffers; i++) { fetch_ranges_.emplace_back(nullptr); } } else { disk_buffer_file_ = make_unique<io::DiskFile>(local_buffer_path_, file_group_->io_mgr_, file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(), io::DiskFileType::LOCAL_BUFFER); } } TmpFileRemote::~TmpFileRemote() { // Need to return the buffer before deconstruction if buffer space is reserved. if (DiskBufferFile()->IsSpaceReserved()) DCHECK(is_buffer_returned()); } bool TmpFileRemote::AllocateSpace(int64_t num_bytes, int64_t* offset) { DCHECK_GT(num_bytes, 0); if (at_capacity_) return false; *offset = allocation_offset_; allocation_offset_ += num_bytes; // The actual size could be a little over the file size. if (allocation_offset_ >= file_size_) { // Set the actual file size of the disk file for the use of writing. GetWriteFile()->SetActualFileSize(allocation_offset_); at_capacity_ = true; } return true; } io::DiskFile* TmpFileRemote::GetWriteFile() { return disk_buffer_file_.get(); } int TmpFileRemote::GetReadBufferIndex(int64_t offset) { DCHECK(disk_buffer_file_ != nullptr); return disk_buffer_file_->GetReadBufferIndex(offset); } void TmpFileRemote::AsyncFetchReadBufferBlock(io::DiskFile* read_buffer_file, io::MemBlock* read_buffer_block, int read_buffer_idx, bool* fetched) { DCHECK(fetched != nullptr); *fetched = false; { shared_lock<shared_mutex> read_file_lock(*(read_buffer_file->GetFileLock())); unique_lock<SpinLock> mem_bloc_lock(*(read_buffer_block->GetLock())); // Check the block status. // If the block is disabled, the caller won't be able to use this buffer block. // If the block is written, the block is already fetched, set the fetched flag and // return immediately. // If the block is uninitialized, we will fetch the block immediately but without // waiting for the fetch, so that it won't block the current page reading. // If the block is in reserved or alloc status, means one other thread is handling // the block, here we don't wait because the blocking could be expensive. if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block, io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock)) { return; } else if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block, io::MemBlockStatus::WRITTEN, read_file_lock, &mem_bloc_lock)) { *fetched = true; return; } else if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block, io::MemBlockStatus::UNINIT, read_file_lock, &mem_bloc_lock)) { bool dofetch = true; int64_t mem_size_limit = file_group_->tmp_file_mgr()->GetRemoteMaxTotalReadBufferSize(); auto read_mem_counter = file_group_->tmp_file_mgr()->scratch_read_memory_buffer_used_metric_; if (read_mem_counter->Increment(read_buffer_file->read_buffer_block_size()) > mem_size_limit) { read_mem_counter->Increment(-1 * read_buffer_file->read_buffer_block_size()); dofetch = false; } if (dofetch) { read_buffer_file->SetReadBufferBlockStatus(read_buffer_block, io::MemBlockStatus::RESERVED, read_file_lock, &mem_bloc_lock); RemoteOperRange::RemoteOperDoneCallback fetch_callback = [read_buffer_block, tmp_file = this](const Status& fetch_status) { if (!fetch_status.ok()) { // Disable the read buffer if fails to fetch. tmp_file->TryDeleteReadBufferExcl(read_buffer_block->block_id()); } }; fetch_ranges_[read_buffer_idx].reset(new RemoteOperRange(disk_file_.get(), read_buffer_file, file_group_->tmp_file_mgr()->GetRemoteTmpBlockSize(), disk_id(true), RequestType::FILE_FETCH, file_group_->io_mgr_, fetch_callback, GetReadBuffStartOffset(read_buffer_idx))); Status add_status = file_group_->io_ctx_->AddRemoteOperRange( fetch_ranges_[read_buffer_idx].get()); if (!add_status.ok()) { read_buffer_file->SetReadBufferBlockStatus(read_buffer_block, io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock); } } else { read_buffer_file->SetReadBufferBlockStatus(read_buffer_block, io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock); } } } *fetched = true; return; } io::DiskFile* TmpFileRemote::GetReadBufferFile(int64_t offset) { // If the local buffer file exists, return the file directly. // If it is deleted (probably due to eviction), and batch reading is enabled, would // try to fetch the current block asynchronously if it is not present in the memory // buffer. // If the local buffer file is deleted and the read memory buffer doesn't have the // block right now, then return a nullptr to indicate there is no buffer available. io::DiskFile* read_buffer_file = disk_buffer_file_.get(); if (disk_buffer_file_->GetFileStatus() != io::DiskFileStatus::DELETED) { return read_buffer_file; } if (!file_group_->tmp_file_mgr()->IsRemoteBatchReadingEnabled()) return nullptr; int read_buffer_idx = GetReadBufferIndex(offset); io::MemBlock* read_buffer_block = disk_buffer_file_->GetBufferBlock(read_buffer_idx); bool fetched = false; io::MemBlockStatus block_status = read_buffer_block->GetStatus(); if (block_status == io::MemBlockStatus::DISABLED) { // do nothing } else if (block_status == io::MemBlockStatus::WRITTEN) { fetched = true; } else { AsyncFetchReadBufferBlock( read_buffer_file, read_buffer_block, read_buffer_idx, &fetched); } return fetched ? read_buffer_file : nullptr; } bool TmpFileRemote::IncrementReadPageCount(int buffer_idx) { int64_t read_count = 0; int64_t total_num = 0; DCheckReadBufferIdx(buffer_idx); total_num = GetReadBuffPageCount(buffer_idx); { lock_guard<SpinLock> lock(lock_); read_count = ++disk_read_page_cnts_[buffer_idx]; } // Return true if all the pages have been read of the block. return read_count == total_num; } template <typename T> void TmpFileRemote::TryDeleteReadBuffer(const T& lock, int buffer_idx) { DCheckReadBufferIdx(buffer_idx); bool reserved = false; bool allocated = false; DCHECK(disk_buffer_file_->IsBatchReadEnabled()); DCHECK(lock.mutex() == disk_buffer_file_->GetFileLock() && lock.owns_lock()); disk_buffer_file_->DeleteReadBuffer( disk_buffer_file_->GetBufferBlock(buffer_idx), &reserved, &allocated, lock); if (reserved || allocated) { // Because the reservation will increase the current allocated read buffer usage // ahead of the real allocation, we need to decrease it if the block is reserved // or allocated. file_group_->tmp_file_mgr_->scratch_read_memory_buffer_used_metric_->Increment( -1 * read_buffer_block_size_); } } TmpDir* TmpFileRemote::GetLocalBufferDir() const { return file_group_->tmp_file_mgr_->GetLocalBufferDir(); } Status TmpFileRemote::Remove() { Status status = Status::OK(); // If True, we need to enqueue the file back to the pool after deletion. bool to_return_the_buffer = false; // Set a flag to notify other threads which are holding the file lock to release, // since the remove process needs a unique lock, it accelerates acquiring the mutex. SetToDeleteFlag(); { // The order of acquiring the lock must be from local to remote to avoid deadlocks. unique_lock<shared_mutex> buffer_file_lock(*(disk_buffer_file_->GetFileLock())); unique_lock<shared_mutex> file_lock(*(disk_file_->GetFileLock())); // Delete the local buffer file if exists. if (disk_buffer_file_->GetFileStatus() != io::DiskFileStatus::DELETED) { status = disk_buffer_file_->Delete(buffer_file_lock); if (!status.ok()) { // If the physical file is failed to delete, log a warning, and set a deleted flag // anyway. LOG(WARNING) << "Delete file: " << disk_buffer_file_->path() << " failed."; disk_buffer_file_->SetStatus(io::DiskFileStatus::DELETED); } else if (disk_file_->GetFileStatus() != io::DiskFileStatus::PERSISTED && disk_buffer_file_->IsSpaceReserved()) { // If the file is not uploaded and the buffer space is reserved, we need to return // the buffer to the pool after deletion of the TmpFile. The buffer of a uploaded // file should have been returned to the pool after upload operation completes. to_return_the_buffer = true; } else { // Do nothing. } } // Set the remote file status to deleted. The physical remote files would be deleted // during deconstruction of TmpFileGroup by deleting the entire remote // directory for efficiency consideration. disk_file_->SetStatus(io::DiskFileStatus::DELETED); // Try to delete all the read buffers. if (file_group_->tmp_file_mgr_->IsRemoteBatchReadingEnabled()) { file_group_->tmp_file_mgr_->ReleaseTmpFileReadBuffer(buffer_file_lock, this); } } // Update the metrics. GetDir()->bytes_used_metric()->Increment(-file_size_); // Return the file to the pool if it hasn't been enqueued. if (to_return_the_buffer) { file_group_->tmp_file_mgr()->EnqueueTmpFilesPool( file_group_->FindTmpFileSharedPtr(this), true); } return status; } TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* profile, const TUniqueId& unique_id, int64_t bytes_limit) : tmp_file_mgr_(tmp_file_mgr), io_mgr_(io_mgr), io_ctx_(nullptr), unique_id_(unique_id), bytes_limit_(bytes_limit), write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)), bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", TUnit::BYTES)), uncompressed_bytes_written_counter_( ADD_COUNTER(profile, "UncompressedScratchBytesWritten", TUnit::BYTES)), read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)), bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)), read_use_mem_counter_(ADD_COUNTER(profile, "ScratchReadsUseMem", TUnit::UNIT)), bytes_read_use_mem_counter_( ADD_COUNTER(profile, "ScratchBytesReadUseMem", TUnit::BYTES)), read_use_local_disk_counter_( ADD_COUNTER(profile, "ScratchReadsUseLocalDisk", TUnit::UNIT)), bytes_read_use_local_disk_counter_( ADD_COUNTER(profile, "ScratchBytesReadUseLocalDisk", TUnit::BYTES)), scratch_space_bytes_used_counter_( ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)), disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")), encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")), compression_timer_(tmp_file_mgr->compression_enabled() ? ADD_TIMER(profile, "TotalCompressionTime") : nullptr), num_blacklisted_files_(0), spilling_disk_faulty_(false), current_bytes_allocated_(0), current_bytes_allocated_remote_(0), next_allocation_index_(0), free_ranges_(64) { DCHECK(tmp_file_mgr != nullptr); io_ctx_ = io_mgr_->RegisterContext(); io_ctx_->set_read_use_mem_counter(read_use_mem_counter_); io_ctx_->set_bytes_read_use_mem_counter(bytes_read_use_mem_counter_); io_ctx_->set_read_use_local_disk_counter(read_use_local_disk_counter_); io_ctx_->set_bytes_read_use_local_disk_counter(bytes_read_use_local_disk_counter_); // Populate the priority based index ranges. const std::vector<std::unique_ptr<TmpDir>>& tmp_dirs = tmp_file_mgr_->tmp_dirs_; if (tmp_dirs.size() > 0) { int start_index = 0; int priority = tmp_dirs[0]->priority(); for (int i = 0; i < tmp_dirs.size() - 1; ++i) { priority = tmp_dirs[i]->priority(); const int next_priority = tmp_dirs[i + 1]->priority(); if (next_priority != priority) { tmp_files_index_range_.emplace(priority, TmpFileIndexRange(start_index, i)); start_index = i + 1; priority = next_priority; } } tmp_files_index_range_.emplace(priority, TmpFileIndexRange(start_index, tmp_dirs.size() - 1)); } } TmpFileGroup::~TmpFileGroup() { DCHECK_EQ(tmp_files_.size(), 0); } Status TmpFileGroup::CreateFiles() { lock_.DCheckLocked(); DCHECK(tmp_files_.empty()); vector<DeviceId> tmp_devices = tmp_file_mgr_->ActiveTmpDevices(); DCHECK(tmp_file_mgr_->NumActiveTmpDevicesLocal() <= tmp_devices.size()); int files_allocated = 0; // Initialize the tmp files and the initial file to use. for (int i = 0; i < tmp_file_mgr_->NumActiveTmpDevicesLocal(); ++i) { DeviceId device_id = tmp_devices[i]; unique_ptr<TmpFile> tmp_file; tmp_file_mgr_->NewFile(this, device_id, &tmp_file); tmp_files_.emplace_back(std::move(tmp_file)); ++files_allocated; } DCHECK_EQ(tmp_file_mgr_->NumActiveTmpDevicesLocal(), files_allocated); DCHECK_EQ(tmp_file_mgr_->NumActiveTmpDevicesLocal(), tmp_files_.size()); if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus({}); // Initialize the next allocation index for each priority. for (const auto& entry: tmp_files_index_range_) { const int priority = entry.first; const int start = entry.second.start; const int end = entry.second.end; // Start allocating on a random device to avoid overloading the first device. next_allocation_index_.emplace(priority, start + rand() % (end - start + 1)); } return Status::OK(); } template <typename T> void TmpFileGroup::CloseInternal(vector<T>& tmp_files) { for (auto& file : tmp_files) { Status status = file->Remove(); if (!status.ok()) { LOG(WARNING) << "Error removing scratch file '" << file->path() << "': " << status.msg().msg(); } } tmp_files.clear(); } void TmpFileGroup::Close() { // Cancel writes before deleting the files, since in-flight writes could re-create // deleted files. if (io_ctx_ != nullptr) { if (tmp_file_mgr_->HasRemoteDir()) { // Remove all the writes using the io_ctx and waiting for buffer reservation in // the pool. DCHECK(tmp_file_mgr_->tmp_dirs_remote_ctrl_.tmp_file_pool_ != nullptr); tmp_file_mgr_->tmp_dirs_remote_ctrl_.tmp_file_pool_->RemoveWriteRanges( io_ctx_.get()); } io_mgr_->UnregisterContext(io_ctx_.get()); } CloseInternal<std::unique_ptr<TmpFile>>(tmp_files_); CloseInternal<std::shared_ptr<TmpFile>>(tmp_files_remote_); tmp_file_mgr_->RemoveRemoteDirForQuery(this); tmp_file_mgr_->scratch_bytes_used_metric_->Increment( -1 * scratch_space_bytes_used_counter_->value()); } // Rounds up to the smallest unit of allocation in a scratch file // that will fit 'bytes'. static int64_t RoundUpToScratchRangeSize(bool punch_holes, int64_t bytes) { if (punch_holes) { // Round up to a typical disk block size - 4KB that that hole punching can always // free the backing storage for the entire range. return BitUtil::RoundUpToPowerOf2(bytes, TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES); } else { // We recycle scratch ranges, which must be positive power-of-two sizes. return max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(bytes)); } } void TmpFileGroup::UpdateScratchSpaceMetrics(int64_t num_bytes, bool is_remote) { scratch_space_bytes_used_counter_->Add(num_bytes); tmp_file_mgr_->scratch_bytes_used_metric_->Increment(num_bytes); current_bytes_allocated_.Add(num_bytes); if (is_remote) current_bytes_allocated_remote_.Add(num_bytes); } string TmpFileGroup::GenerateNewPath(const string& dir, const string& unique_name) { stringstream file_name; file_name << TMP_SUB_DIR_NAME << "-" << unique_name; path new_file_path(dir); new_file_path /= file_name.str(); return new_file_path.string(); } std::shared_ptr<TmpFile>& TmpFileGroup::FindTmpFileSharedPtr(TmpFile* tmp_file) { DCHECK(tmp_file != nullptr); DCHECK(tmp_file->DiskFile()->disk_type() != io::DiskFileType::LOCAL); lock_guard<SpinLock> lock(tmp_files_remote_ptrs_lock_); auto shared_file_it = tmp_files_remote_ptrs_.find(tmp_file); DCHECK(shared_file_it != tmp_files_remote_ptrs_.end()); return shared_file_it->second; } Status TmpFileGroup::AllocateRemoteSpace(int64_t num_bytes, TmpFile** tmp_file, int64_t* file_offset, vector<int>* at_capacity_dirs) { // Only one remote dir supported currently. string dir = tmp_file_mgr_->tmp_dirs_remote_->path(); // It is not supposed to have a remote directory other than HDFS, Ozone, or S3. DCHECK(IsHdfsPath(dir.c_str(), false) || IsOzonePath(dir.c_str(), false) || IsS3APath(dir.c_str(), false)); // Look for the space from a previous created file. if (!tmp_files_remote_.empty()) { TmpFile* tmp_file_cur = tmp_files_remote_.back().get(); // If the file is blocklisted or is at capacity, we will create a new file instead. if (!tmp_file_cur->is_blacklisted()) { if (tmp_file_cur->AllocateSpace(num_bytes, file_offset)) { *tmp_file = tmp_file_cur; return Status::OK(); } } } // Return an error if the new bytes is over the bytes limit of the query or the remote // directory. int64_t new_bytes = current_bytes_allocated_.Load() + tmp_file_mgr_->GetRemoteTmpFileSize(); if (bytes_limit_ != -1 && new_bytes > bytes_limit_) { return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_, GetBackendString()); } int64_t remote_dir_bytes_limit = tmp_file_mgr_->tmp_dirs_remote_->bytes_limit(); if (remote_dir_bytes_limit != -1 && new_bytes > remote_dir_bytes_limit) { return Status( TErrorCode::SCRATCH_LIMIT_EXCEEDED, remote_dir_bytes_limit, GetBackendString()); } // The device id of remote directory is defined as the max local device id // plus the index of the remote dir. Since we only support one remote dir now, // the id is the max local device id plus one. DeviceId dev_id = tmp_file_mgr_->tmp_dirs_.size(); string unique_name = lexical_cast<string>(random_generator()()); stringstream file_name; dir = ConstructRemoteDirPath(dir, ExecEnv::GetInstance()->configured_backend_address().hostname, PrintId(ExecEnv::GetInstance()->backend_id(), "_"), PrintId(unique_id(), "_")); string new_file_path = GenerateNewPath(dir, unique_name); const string& local_buffer_dir = tmp_file_mgr_->local_buff_dir_->path(); string new_file_path_local = GenerateNewPath(local_buffer_dir, unique_name); TmpFileRemote* tmp_file_r = new TmpFileRemote( this, dev_id, new_file_path, new_file_path_local, false, dir.c_str()); if (tmp_file_r == nullptr) { return Status("Failed to allocate temporary file object."); } if (tmp_file_r->hdfs_conn_ == nullptr) { return Status(Substitute("Failed to connect to FS: $0.", dir)); } shared_ptr<TmpFile> tmp_file_remote(move(tmp_file_r)); int64_t file_size = tmp_file_mgr_->GetRemoteTmpFileSize(); TmpDir* tmp_dir_remote = tmp_file_remote->GetDir(); if (tmp_dir_remote->bytes_limit() != -1 && tmp_dir_remote->bytes_used_metric()->Increment(file_size) > tmp_dir_remote->bytes_limit()) { tmp_dir_remote->bytes_used_metric()->Increment(-file_size); at_capacity_dirs->push_back(dev_id); return Status(Substitute("Reach the size limit $0 of dir: $1", tmp_dir_remote->bytes_limit(), tmp_dir_remote->path())); } UpdateScratchSpaceMetrics(file_size, true); tmp_files_remote_.emplace_back(move(tmp_file_remote)); *tmp_file = tmp_files_remote_.back().get(); // It should be a successful return to allocate the first range from the new file. if (!(*tmp_file)->AllocateSpace(num_bytes, file_offset)) { DCHECK(false) << "Should be a successful allocation for the first write range."; } DCHECK_EQ(*file_offset, 0); { lock_guard<SpinLock> lock(tmp_files_remote_ptrs_lock_); tmp_files_remote_ptrs_.emplace(*tmp_file, tmp_files_remote_.back()); } // Try to reserve the space for local buffer with a quick return to avoid // a long wait, if failed, caller should do the reservation for the buffer. Status reserve_status = tmp_file_mgr_->ReserveLocalBufferSpace(true); if (reserve_status.ok()) (*tmp_file)->GetWriteFile()->SetSpaceReserved(); return Status::OK(); } Status TmpFileGroup::AllocateLocalSpace(int64_t num_bytes, TmpFile** tmp_file, int64_t* file_offset, vector<int>* at_capacity_dirs, bool* alloc_full) { int64_t scratch_range_bytes = RoundUpToScratchRangeSize(tmp_file_mgr_->punch_holes(), num_bytes); int free_ranges_idx = BitUtil::Log2Ceiling64(scratch_range_bytes); if (!free_ranges_[free_ranges_idx].empty()) { DCHECK(!tmp_file_mgr_->punch_holes()) << "Ranges not recycled when punching holes"; *tmp_file = free_ranges_[free_ranges_idx].back().first; *file_offset = free_ranges_[free_ranges_idx].back().second; free_ranges_[free_ranges_idx].pop_back(); return Status::OK(); } if (bytes_limit_ != -1 && current_bytes_allocated_.Load() + scratch_range_bytes > bytes_limit_) { return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_, GetBackendString()); } // Lazily create the files on the first write. if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles()); // Find the next physical file in priority based round-robin order and allocate a range // from it. for (const auto& entry: tmp_files_index_range_) { const int priority = entry.first; const int start = entry.second.start; const int end = entry.second.end; DCHECK (0 <= start && start <= end && end < tmp_files_.size()) << "Invalid index range: [" << start << ", " << end << "] " << "tmp_files_.size(): " << tmp_files_.size(); for (int index = start; index <= end; ++index) { const int idx = next_allocation_index_[priority]; next_allocation_index_[priority] = start + (idx - start + 1) % (end - start + 1); *tmp_file = tmp_files_[idx].get(); if ((*tmp_file)->is_blacklisted()) continue; // Check the per-directory limit. if (!(*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset)) { at_capacity_dirs->push_back(idx); continue; } UpdateScratchSpaceMetrics(scratch_range_bytes); return Status::OK(); } } // Using a bool to notify there is no more space left, could cost less overhead than // using a Status, because we want the error reporting as fast as possible for the // case of mixing use of remote and local scratch space, so that it can keep trying to // allocate from the remote after this. *alloc_full = true; return Status::OK(); } Status TmpFileGroup::AllocateSpace( int64_t num_bytes, TmpFile** tmp_file, int64_t* file_offset) { // Since in eviction, it probably waits for the async upload task if it // reaches bytes limit, so it can be slow here. lock_guard<SpinLock> lock(lock_); // Track the indices of any directories where we failed due to capacity. This is // required for error reporting if we are totally out of capacity so that it's clear // that some disks were at capacity. vector<int> at_capacity_dirs; if (!tmp_file_mgr_->tmp_dirs_.empty()) { // If alloc_full is set true, meaning all of the local directories are at capacity. bool alloc_full = false; Status status = AllocateLocalSpace( num_bytes, tmp_file, file_offset, &at_capacity_dirs, &alloc_full); // If the all of the dirs are at capacity, try remote scratch space. // Otherwise, return the status (could be an okay or error). if (!status.ok() || !alloc_full) return status; } // If can't find any space locally, allocate from remote scratch space. if (tmp_file_mgr_->tmp_dirs_remote_ != nullptr) { Status remote_status = AllocateRemoteSpace(num_bytes, tmp_file, file_offset, &at_capacity_dirs); if (remote_status.ok() || at_capacity_dirs.empty()) return remote_status; } return ScratchAllocationFailedStatus(at_capacity_dirs); } void TmpFileGroup::RecycleFileRange(unique_ptr<TmpWriteHandle> handle) { TmpFile* file = handle->file_; int64_t space_used_bytes = RoundUpToScratchRangeSize(tmp_file_mgr_->punch_holes(), handle->on_disk_len()); if (tmp_file_mgr_->punch_holes()) { Status status = file->PunchHole(handle->write_range_->offset(), space_used_bytes); if (!status.ok()) { // Proceed even in the hole punching fails - we will use extra disk space but // functionally we can continue to spill. LOG_EVERY_N(WARNING, 100) << "Failed to punch hole in scratch file, couldn't " << "reclaim space: " << status.GetDetail(); return; } scratch_space_bytes_used_counter_->Add(-space_used_bytes); tmp_file_mgr_->scratch_bytes_used_metric_->Increment(-space_used_bytes); current_bytes_allocated_.Add(-space_used_bytes); } else { // For the remote files, we don't recycle the file and range because the remote file // is not allowed to in-place modification. if (file->DiskFile()->disk_type() == io::DiskFileType::LOCAL) { int free_ranges_idx = BitUtil::Log2Ceiling64(space_used_bytes); lock_guard<SpinLock> lock(lock_); free_ranges_[free_ranges_idx].emplace_back(file, handle->write_range_->offset()); } } } Status TmpFileGroup::Write(MemRange buffer, WriteDoneCallback cb, unique_ptr<TmpWriteHandle>* handle, const BufferPoolClientCounters* counters) { DCHECK_GE(buffer.len(), 0); unique_ptr<TmpWriteHandle> tmp_handle(new TmpWriteHandle(this, move(cb))); TmpWriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda. WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr]( const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); }; RETURN_IF_ERROR(tmp_handle->Write(io_ctx_.get(), buffer, callback, counters)); *handle = move(tmp_handle); return Status::OK(); } Status TmpFileGroup::Read(TmpWriteHandle* handle, MemRange buffer) { RETURN_IF_ERROR(ReadAsync(handle, buffer)); return WaitForAsyncRead(handle, buffer); } Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) { DCHECK(handle->write_range_ != nullptr); DCHECK(!handle->is_cancelled_); DCHECK_EQ(buffer.len(), handle->data_len()); Status status; VLOG(3) << "ReadAsync " << handle->TmpFilePath() << " " << handle->write_range_->offset() << " " << handle->on_disk_len(); // Don't grab 'write_state_lock_' in this method - it is not necessary because we // don't touch any members that it protects and could block other threads for the // duration of the synchronous read. DCHECK(!handle->write_in_flight_); DCHECK(handle->read_range_ == nullptr); DCHECK(handle->write_range_ != nullptr); MemRange read_buffer = buffer; if (handle->is_compressed()) { int64_t compressed_len = handle->compressed_len_; if (!handle->compressed_.TryAllocate(compressed_len)) { return tmp_file_mgr_->compressed_buffer_tracker()->MemLimitExceeded( nullptr, "Failed to decompress spilled data", compressed_len); } DCHECK_EQ(compressed_len, handle->write_range_->len()); read_buffer = MemRange(handle->compressed_.buffer(), compressed_len); } // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state // since the write is not in flight. handle->read_range_ = scan_range_pool_.Add(new ScanRange); int64_t offset = handle->write_range_->offset(); if (handle->file_ != nullptr && !handle->file_->is_local()) { TmpFileRemote* tmp_file = static_cast<TmpFileRemote*>(handle->file_); DiskFile* local_read_buffer_file = tmp_file->GetReadBufferFile(offset); DiskFile* remote_file = tmp_file->DiskFile(); // Reset the read_range, use the remote filesystem's disk id. handle->read_range_->Reset( ScanRange::FileInfo{ remote_file->path().c_str(), tmp_file->hdfs_conn_, tmp_file->mtime_}, handle->write_range_->len(), offset, tmp_file->disk_id(), false, BufferOpts::ReadInto( read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING), nullptr, remote_file, local_read_buffer_file); } else { // Read from local. handle->read_range_->Reset( ScanRange::FileInfo{handle->write_range_->file()}, handle->write_range_->len(), offset, handle->write_range_->disk_id(), false, BufferOpts::ReadInto( read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING)); } read_counter_->Add(1); bytes_read_counter_->Add(read_buffer.len()); bool needs_buffers; RETURN_IF_ERROR(io_ctx_->StartScanRange(handle->read_range_, &needs_buffers)); DCHECK(!needs_buffers) << "Already provided a buffer"; return Status::OK(); } Status TmpFileGroup::WaitForAsyncRead( TmpWriteHandle* handle, MemRange buffer, const BufferPoolClientCounters* counters) { DCHECK(handle->read_range_ != nullptr); // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state // since the write is not in flight. SCOPED_TIMER(disk_read_timer_); MemRange read_buffer = handle->is_compressed() ? MemRange{handle->compressed_.buffer(), handle->compressed_.Size()} : buffer; DCHECK(read_buffer.data() != nullptr); unique_ptr<BufferDescriptor> io_mgr_buffer; Status status = handle->read_range_->GetNext(&io_mgr_buffer); if (!status.ok()) goto exit; DCHECK(io_mgr_buffer != NULL); DCHECK(io_mgr_buffer->eosr()); DCHECK_LE(io_mgr_buffer->len(), read_buffer.len()); if (io_mgr_buffer->len() < read_buffer.len()) { // The read was truncated - this is an error. status = Status(TErrorCode::SCRATCH_READ_TRUNCATED, read_buffer.len(), handle->write_range_->file(), GetBackendString(), handle->write_range_->offset(), io_mgr_buffer->len()); goto exit; } DCHECK_EQ(io_mgr_buffer->buffer(), handle->is_compressed() ? handle->compressed_.buffer() : buffer.data()); // Decrypt and decompress in the reverse order that we compressed then encrypted the // data originally. if (FLAGS_disk_spill_encryption) { status = handle->CheckHashAndDecrypt(read_buffer, counters); if (!status.ok()) goto exit; } if (handle->is_compressed()) { SCOPED_TIMER2( compression_timer_, counters == nullptr ? nullptr : counters->compression_time); scoped_ptr<Codec> decompressor; status = Codec::CreateDecompressor( nullptr, false, tmp_file_mgr_->compression_codec(), &decompressor); if (status.ok()) { int64_t decompressed_len = buffer.len(); uint8_t* decompressed_buffer = buffer.data(); status = decompressor->ProcessBlock(true, read_buffer.len(), read_buffer.data(), &decompressed_len, &decompressed_buffer); } // Free the compressed data regardless of whether the read was successful. handle->FreeCompressedBuffer(); if (!status.ok()) goto exit; } exit: if (handle->file_ != nullptr && !handle->file_->is_local()) { auto tmp_file = static_cast<TmpFileRemote*>(handle->file_); // If all the pages of specific read buffer have been read, try delete the read // buffer. if (tmp_file_mgr()->IsRemoteBatchReadingEnabled()) { int buffer_idx = tmp_file->GetReadBufferIndex(handle->write_range_->offset()); bool all_read = tmp_file->IncrementReadPageCount(buffer_idx); if (all_read) tmp_file->TryDeleteMemReadBufferShared(buffer_idx); } } // Always return the buffer before exiting to avoid leaking it. if (io_mgr_buffer != nullptr) handle->read_range_->ReturnBuffer(move(io_mgr_buffer)); handle->read_range_ = nullptr; return status; } Status TmpFileGroup::RestoreData(unique_ptr<TmpWriteHandle> handle, MemRange buffer, const BufferPoolClientCounters* counters) { DCHECK_EQ(handle->data_len(), buffer.len()); if (!handle->is_compressed()) DCHECK_EQ(handle->write_range_->data(), buffer.data()); DCHECK(!handle->write_in_flight_); DCHECK(handle->read_range_ == nullptr); VLOG(3) << "Restore " << handle->TmpFilePath() << " " << handle->write_range_->offset() << " " << handle->data_len(); Status status; if (handle->is_compressed()) { // 'buffer' already contains the data needed, because the compressed data was written // to 'compressed_' and (optionally) encrypted over there. } else if (FLAGS_disk_spill_encryption) { // Decrypt after the write is finished, so that we don't accidentally write decrypted // data to disk. status = handle->CheckHashAndDecrypt(buffer, counters); } RecycleFileRange(move(handle)); return status; } void TmpFileGroup::DestroyWriteHandle(unique_ptr<TmpWriteHandle> handle) { handle->Cancel(); handle->WaitForWrite(); RecycleFileRange(move(handle)); } void TmpFileGroup::WriteComplete( TmpWriteHandle* handle, const Status& write_status) { Status status; // Debug action for simulating disk write error. To use, specify in query options as: // 'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>' // where <hostname> and <port> represent the impalad which execute the fragment // instances, <port> is the BE krpc port (default 27000). const Status* p_write_status = &write_status; Status debug_status = DebugAction(debug_action_, "IMPALA_TMP_FILE_WRITE", {ExecEnv::GetInstance()->krpc_address().hostname(), SimpleItoa(ExecEnv::GetInstance()->krpc_address().port())}); if (UNLIKELY(!debug_status.ok())) p_write_status = &debug_status; if (!p_write_status->ok()) { status = RecoverWriteError(handle, *p_write_status); if (status.ok()) return; } else { status = *p_write_status; } handle->WriteComplete(status); } Status TmpFileGroup::RecoverWriteError( TmpWriteHandle* handle, const Status& write_status) { DCHECK(!write_status.ok()); DCHECK(handle->file_ != nullptr); // We can't recover from cancellation or memory limit exceeded. if (write_status.IsCancelled() || write_status.IsMemLimitExceeded()) { return write_status; } // We don't recover the errors generated during spilling to a remote file. if (handle->file_->disk_type() != io::DiskFileType::LOCAL) { return write_status; } // Save and report the error before retrying so that the failure isn't silent. { lock_guard<SpinLock> lock(lock_); scratch_errors_.push_back(write_status); if (handle->file_->Blacklist(write_status.msg())) { DCHECK_LT(num_blacklisted_files_, tmp_files_.size()); ++num_blacklisted_files_; if (num_blacklisted_files_ == tmp_files_.size()) { // Check if all errors are 'blacklistable'. bool are_all_blacklistable_errors = true; for (Status& err : scratch_errors_) { if (!ErrorConverter::IsBlacklistableError(err)) { are_all_blacklistable_errors = false; break; } } if (are_all_blacklistable_errors) spilling_disk_faulty_ = true; } } } // Do not retry cancelled writes or propagate the error, simply return CANCELLED. if (handle->is_cancelled_) return Status::CancelledInternal("TmpFileMgr write"); TmpFile* tmp_file; int64_t file_offset; // Discard the scratch file range - we will not reuse ranges from a bad file. // Choose another file to try. Blacklisting ensures we don't retry the same file. // If this fails, the status will include all the errors in 'scratch_errors_'. RETURN_IF_ERROR(AllocateSpace(handle->on_disk_len(), &tmp_file, &file_offset)); return handle->RetryWrite(io_ctx_.get(), tmp_file, file_offset); } Status TmpFileGroup::ScratchAllocationFailedStatus( const vector<int>& at_capacity_dirs) { vector<string> tmp_dir_paths; for (std::unique_ptr<TmpDir>& tmp_dir : tmp_file_mgr_->tmp_dirs_) { tmp_dir_paths.push_back(tmp_dir->path()); } vector<string> at_capacity_dir_paths; for (int dir_idx : at_capacity_dirs) { if (dir_idx >= tmp_file_mgr_->tmp_dirs_.size()) { at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_remote_->path()); } else { at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx]->path()); } } Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED, join(tmp_dir_paths, ","), GetBackendString(), PrettyPrinter::PrintBytes( tmp_file_mgr_->scratch_bytes_used_metric_->current_value()->GetValue()), PrettyPrinter::PrintBytes(current_bytes_allocated_.Load()), join(at_capacity_dir_paths, ",")); // Include all previous errors that may have caused the failure. for (Status& err : scratch_errors_) status.MergeStatus(err); return status; } bool TmpFileGroup::IsSpillingDiskFaulty() { lock_guard<SpinLock> lock(lock_); return spilling_disk_faulty_; } string TmpFileGroup::DebugString() { lock_guard<SpinLock> lock(lock_); stringstream ss; ss << "TmpFileGroup " << this << " bytes limit " << bytes_limit_ << " current bytes allocated " << current_bytes_allocated_.Load() << " next allocation index [ "; // Get priority based allocation index. for (const auto& entry: next_allocation_index_) { ss << " (priority: " << entry.first << ", index: " << entry.second << "), "; } ss << "] writes " << write_counter_->value() << " bytes written " << bytes_written_counter_->value() << " uncompressed bytes written " << uncompressed_bytes_written_counter_->value() << " reads " << read_counter_->value() << " bytes read " << bytes_read_counter_->value() << " scratch bytes used " << scratch_space_bytes_used_counter_ << " dist read timer " << disk_read_timer_->value() << " encryption timer " << encryption_timer_->value() << endl << " " << tmp_files_.size() << " files:" << endl; for (unique_ptr<TmpFile>& file : tmp_files_) { ss << " " << file->DebugString() << endl; } return ss.str(); } TmpWriteHandle::TmpWriteHandle( TmpFileGroup* const parent, WriteRange::WriteDoneCallback cb) : parent_(parent), cb_(move(cb)), compressed_(parent_->tmp_file_mgr_->compressed_buffer_tracker()) {} TmpWriteHandle::~TmpWriteHandle() { DCHECK(!write_in_flight_); DCHECK(read_range_ == nullptr); DCHECK(compressed_.buffer() == nullptr); } string TmpWriteHandle::TmpFilePath() const { if (file_ == nullptr) return ""; return file_->path(); } string TmpWriteHandle::TmpFileBufferPath() const { if (file_ == nullptr) return ""; return file_->LocalBuffPath(); } int64_t TmpWriteHandle::on_disk_len() const { return write_range_->len(); } Status TmpWriteHandle::Write(RequestContext* io_ctx, MemRange buffer, WriteRange::WriteDoneCallback callback, const BufferPoolClientCounters* counters) { DCHECK(!write_in_flight_); MemRange buffer_to_write = buffer; if (parent_->tmp_file_mgr_->compression_enabled() && TryCompress(buffer, counters)) { buffer_to_write = MemRange(compressed_.buffer(), compressed_len_); } // Ensure that the compressed buffer is freed on all the code paths where we did not // start the write successfully. bool write_started = false; const auto free_compressed = MakeScopeExitTrigger([this, &write_started]() { if (!write_started) FreeCompressedBuffer(); }); // Allocate space after doing compression, to avoid overallocating space. TmpFile* tmp_file; int64_t file_offset; Status status = Status::OK(); // For the second unpin of a page, it will be written to a new file since the // content should be changed RETURN_IF_ERROR(parent_->AllocateSpace(buffer_to_write.len(), &tmp_file, &file_offset)); if (FLAGS_disk_spill_encryption) { RETURN_IF_ERROR(EncryptAndHash(buffer_to_write, counters)); } // Set all member variables before calling AddWriteRange(): after it succeeds, // WriteComplete() may be called concurrently with the remainder of this function. // If the TmpFile is not local, the disk queue assigned should be for the // buffer. data_len_ = buffer.len(); file_ = tmp_file; write_range_.reset(new WriteRange(tmp_file->path(), file_offset, tmp_file->AssignDiskQueue(!tmp_file->is_local()), move(callback))); write_range_->SetData(buffer_to_write.data(), buffer_to_write.len()); // For remote files, we write the range to the local buffer. write_range_->SetDiskFile(tmp_file->GetWriteFile()); VLOG(3) << "Write " << tmp_file->path() << " " << file_offset << " " << buffer_to_write.len(); write_in_flight_ = true; write_range_->SetRequestContext(io_ctx); // Add the write range asyncly to the DiskQueue for writing. status = parent_->tmp_file_mgr()->AsyncWriteRange(write_range_.get(), tmp_file); if (!status.ok()) { // The write will not be in flight if we returned with an error. write_in_flight_ = false; // We won't return this TmpWriteHandle to the client of TmpFileGroup, so it won't be // cancelled in the normal way. Mark the handle as cancelled so it can be // cleanly destroyed. is_cancelled_ = true; return status; } write_started = true; parent_->write_counter_->Add(1); parent_->uncompressed_bytes_written_counter_->Add(buffer.len()); parent_->bytes_written_counter_->Add(buffer_to_write.len()); return Status::OK(); } bool TmpWriteHandle::TryCompress( MemRange buffer, const BufferPoolClientCounters* counters) { DCHECK(parent_->tmp_file_mgr_->compression_enabled()); SCOPED_TIMER2(parent_->compression_timer_, counters == nullptr ? nullptr : counters->compression_time); DCHECK_LT(compressed_len_, 0); DCHECK(compressed_.buffer() == nullptr); scoped_ptr<Codec> compressor; Status status = Codec::CreateCompressor(nullptr, false, Codec::CodecInfo(parent_->tmp_file_mgr_->compression_codec(), parent_->tmp_file_mgr_->compression_level()), &compressor); if (!status.ok()) { LOG(WARNING) << "Failed to compress, couldn't create compressor: " << status.GetDetail(); return false; } int64_t compressed_buffer_len = compressor->MaxOutputLen(buffer.len()); if (!compressed_.TryAllocate(compressed_buffer_len)) { LOG_EVERY_N(INFO, 100) << "Failed to compress: couldn't allocate " << PrettyPrinter::PrintBytes(compressed_buffer_len); return false; } uint8_t* compressed_buffer = compressed_.buffer(); int64_t compressed_len = compressed_buffer_len; status = compressor->ProcessBlock( true, buffer.len(), buffer.data(), &compressed_len, &compressed_buffer); if (!status.ok()) { compressed_.Release(); return false; } compressed_len_ = compressed_len; VLOG(3) << "Buffer size: " << buffer.len() << " compressed size: " << compressed_len; return true; } Status TmpWriteHandle::RetryWrite(RequestContext* io_ctx, TmpFile* file, int64_t offset) { DCHECK(write_in_flight_); file_ = file; write_range_->SetRange(file->path(), offset, file->AssignDiskQueue()); write_range_->SetDiskFile(file->GetWriteFile()); Status status = io_ctx->AddWriteRange(write_range_.get()); if (!status.ok()) { // The write will not be in flight if we returned with an error. write_in_flight_ = false; return status; } return Status::OK(); } void TmpWriteHandle::UploadComplete(TmpFile* tmp_file, const Status& upload_status) { if (upload_status.ok()) { // If uploaded, the local buffer is available to be evicted, so enqueue it to the // pool. DCHECK(tmp_file != nullptr); TmpFileGroup* file_group = tmp_file->file_group_; file_group->tmp_file_mgr_->EnqueueTmpFilesPool( file_group->FindTmpFileSharedPtr(tmp_file), file_group->tmp_file_mgr_->GetRemoteTmpFileBufferPoolLifo()); } else { LOG(WARNING) << "Upload temporary file: '" << tmp_file->path() << " failed"; } } void TmpWriteHandle::WriteComplete(const Status& write_status) { WriteDoneCallback cb; Status status = write_status; { lock_guard<mutex> lock(write_state_lock_); DCHECK(write_in_flight_); write_in_flight_ = false; // Need to extract 'cb_' because once 'write_in_flight_' is false and we release // 'write_state_lock_', 'this' may be destroyed. cb = move(cb_); if (is_compressed()) { DCHECK(compressed_.buffer() != nullptr); FreeCompressedBuffer(); } if (status.ok() && !file_->expected_local_) { // Do file upload if the local buffer file is finished. if (write_range_->is_full()) { TmpFileRemote* tmp_file = static_cast<TmpFileRemote*>(file_); RemoteOperRange::RemoteOperDoneCallback u_callback = [this, tmp_file]( const Status& upload_status) { UploadComplete(tmp_file, upload_status); }; tmp_file->upload_range_.reset( new RemoteOperRange(tmp_file->DiskBufferFile(), tmp_file->DiskFile(), parent_->tmp_file_mgr()->GetRemoteTmpBlockSize(), tmp_file->disk_id(true), RequestType::FILE_UPLOAD, parent_->io_mgr_, u_callback)); status = parent_->io_ctx_->AddRemoteOperRange(tmp_file->upload_range_.get()); } } // Notify before releasing the lock - after the lock is released 'this' may be // destroyed. write_complete_cv_.NotifyAll(); } // Call 'cb' last - once 'cb' is called client code may call Read() or destroy this // handle. cb(status); } void TmpWriteHandle::Cancel() { CancelRead(); { unique_lock<mutex> lock(write_state_lock_); is_cancelled_ = true; // TODO: in future, if DiskIoMgr supported write cancellation, we could cancel it // here. } } void TmpWriteHandle::CancelRead() { if (read_range_ != nullptr) { read_range_->Cancel(Status::CancelledInternal("TmpFileMgr read")); read_range_ = nullptr; FreeCompressedBuffer(); } } void TmpWriteHandle::WaitForWrite() { unique_lock<mutex> lock(write_state_lock_); while (write_in_flight_) write_complete_cv_.Wait(lock); } Status TmpWriteHandle::EncryptAndHash( MemRange buffer, const BufferPoolClientCounters* counters) { DCHECK(FLAGS_disk_spill_encryption); SCOPED_TIMER2(parent_->encryption_timer_, counters == nullptr ? nullptr : counters->encryption_time); // Since we're using GCM/CTR/CFB mode, we must take care not to reuse a // key/IV pair. Regenerate a new key and IV for every data buffer we write. RETURN_IF_ERROR(key_.InitializeRandom(AES_BLOCK_SIZE, key_.GetSupportedDefaultMode())); RETURN_IF_ERROR(key_.Encrypt(buffer.data(), buffer.len(), buffer.data())); if (!key_.IsGcmMode()) { hash_.Compute(buffer.data(), buffer.len()); } return Status::OK(); } Status TmpWriteHandle::CheckHashAndDecrypt( MemRange buffer, const BufferPoolClientCounters* counters) { DCHECK(FLAGS_disk_spill_encryption); DCHECK(write_range_ != nullptr); SCOPED_TIMER2(parent_->encryption_timer_, counters == nullptr ? nullptr : counters->encryption_time); // GCM mode will verify the integrity by itself if (!key_.IsGcmMode()) { if (!hash_.Verify(buffer.data(), buffer.len())) { return Status(TErrorCode::SCRATCH_READ_VERIFY_FAILED, buffer.len(), write_range_->file(), GetBackendString(), write_range_->offset()); } } Status decrypt_status = key_.Decrypt(buffer.data(), buffer.len(), buffer.data()); if (!decrypt_status.ok()) { // Treat decryption failing as a verification failure, but include extra info from // the decryption status. Status result_status(TErrorCode::SCRATCH_READ_VERIFY_FAILED, buffer.len(), write_range_->file(), GetBackendString(), write_range_->offset()); result_status.MergeStatus(decrypt_status); return result_status; } return Status::OK(); } void TmpWriteHandle::FreeCompressedBuffer() { if (compressed_.buffer() == nullptr) return; DCHECK(is_compressed()); compressed_.Release(); } string TmpWriteHandle::DebugString() { unique_lock<mutex> lock(write_state_lock_); stringstream ss; ss << "Write handle " << this << " file '" << file_->path() << "'" << " is cancelled " << is_cancelled_ << " write in flight " << write_in_flight_; if (write_range_ != NULL) { ss << " data " << write_range_->data() << " disk range len " << write_range_->len() << " file offset " << write_range_->offset() << " disk id " << write_range_->disk_id(); } return ss.str(); } TmpFileBufferPool::TmpFileBufferPool(TmpFileMgr* tmp_file_mgr) : tmp_file_mgr_(tmp_file_mgr) { tmp_file_dummy_.reset(new TmpFileDummy()); } TmpFileBufferPool::~TmpFileBufferPool() { DCHECK(shut_down_); } void TmpFileBufferPool::ShutDown() { { unique_lock<mutex> l(lock_); shut_down_ = true; } // Wake up the waiting thread. work_available_.NotifyAll(); } void TmpFileBufferPool::TmpFileSpaceReserveThreadLoop() { while (true) { { unique_lock<mutex> l(lock_); while (!shut_down_ && write_ranges_.empty()) { // Wait if there are no ranges in the queue. work_available_.Wait(l); } if (shut_down_) return; DCHECK(!write_ranges_.empty()); cur_write_range_ = write_ranges_.front(); write_ranges_.pop_front(); DCHECK(cur_write_range_ != nullptr); // Find out the TmpFile which the current range is associated with, and store the // shared_ptr of the file to cur_tmp_file_ in case it is deconstructed while waiting // for reservation. auto it = write_ranges_iterator_.find(cur_write_range_); DCHECK(it != write_ranges_iterator_.end()); TmpFile* tmp_file = it->second.second; cur_tmp_file_ = tmp_file->FileGroup()->FindTmpFileSharedPtr(tmp_file); DCHECK(cur_tmp_file_ != nullptr); DCHECK_EQ(cur_write_range_->disk_file(), cur_tmp_file_->GetWriteFile()); write_ranges_iterator_.erase(it); } // Reserve space from the tmp_files_avail_pool_. The process could need a long wait. Status status = tmp_file_mgr_->ReserveLocalBufferSpace(false); vector<TmpFileMgr::WriteDoneCallback> write_callbacks; { unique_lock<mutex> lock(lock_); if (status.ok()) { DCHECK(cur_tmp_file_ != nullptr); cur_tmp_file_->GetWriteFile()->SetSpaceReserved(); if (cur_write_range_ != nullptr) { // Send all of the writes of the same disk file to the disk queue. status = MoveWriteRangesHelper( cur_write_range_->disk_file(), &write_callbacks, false); } else { // If the current range becomes a nullptr, it must be set by // RemoveWriteRanges(). In this case, the io_ctx which the range belongs to is // cancelled, and all the writes using that io_ctx are already cancelled. So, we // are safe to return the TmpFile to the pool to recycle the buffer space. EnqueueTmpFilesPool(cur_tmp_file_, true); } } else if (!status.ok() && cur_write_range_ != nullptr) { // Cancel the spilling if fails to reserve the buffer. RemoveWriteRangesInternal(cur_write_range_->io_ctx(), &write_callbacks); status = Status::CancelledInternal( Substitute("TmpFileBufferPool because: $0", status.GetDetail()).c_str()); } cur_write_range_ = nullptr; cur_tmp_file_.reset(); } for (const TmpFileMgr::WriteDoneCallback& write_callback : write_callbacks) { write_callback(status); } } } Status TmpFileBufferPool::MoveWriteRangesHelper(DiskFile* disk_file, vector<TmpFileMgr::WriteDoneCallback>* write_callbacks, bool is_cancelled) { Status status = Status::OK(); auto write_ranges_it = write_ranges_to_add_.find(disk_file); if (write_ranges_it != write_ranges_to_add_.end()) { auto write_range_it = write_ranges_it->second.begin(); while (write_range_it != write_ranges_it->second.end()) { auto range = *write_range_it; DCHECK(range != nullptr); if (status.ok() && !is_cancelled) { status = range->io_ctx()->AddWriteRange(range); } else { write_callbacks->push_back(range->callback()); if (is_cancelled && range->offset() == 0) { // If is_cancelled is set, try to remove the range from the write_ranges list. // If the range hasn't been popped, it must still be in the write_ranges list. if (cur_write_range_ != range) { auto key_range_it = write_ranges_iterator_.find(range); DCHECK(key_range_it != write_ranges_iterator_.end()); DCHECK_EQ(*(key_range_it->second.first), range); write_ranges_.erase(key_range_it->second.first); write_ranges_iterator_.erase(key_range_it); } } } write_range_it = write_ranges_it->second.erase(write_range_it); } write_ranges_to_add_.erase(write_ranges_it); } return status; } Status TmpFileBufferPool::EnqueueWriteRange(io::WriteRange* range, TmpFile* tmp_file) { Status status = Status::OK(); { unique_lock<mutex> write_range_list_lock(lock_); DCHECK(range != nullptr); DCHECK(range->disk_file() != nullptr); DCHECK(range->io_ctx() != nullptr); if (range->disk_file()->IsSpaceReserved()) { // If the space is reserved, send the range to the DiskQueue. return range->io_ctx()->AddWriteRange(range); } else if (range->io_ctx()->IsCancelled()) { // If the io_ctx is cancelled, nofity the caller to cancel the query. return TMP_FILE_BUFFER_POOL_CONTEXT_CANCELLED; } else { io_ctx_to_file_set_map_[range->io_ctx()].insert(range->disk_file()); write_ranges_to_add_[range->disk_file()].emplace_back(range); } // Put the first range of a file to the queue for waiting for the available space, // the ranges in the queue would be popped one by one, when the space is reserved, // all ranges of the file are added to the DiskQueue by io_ctx. if (range->offset() == 0) { write_ranges_.emplace_back(range); DCHECK(tmp_file != nullptr); write_ranges_iterator_[range] = std::make_pair(prev(write_ranges_.cend()), tmp_file); } } work_available_.NotifyAll(); return status; } void TmpFileBufferPool::RemoveWriteRangesInternal( RequestContext* io_ctx, vector<TmpFileMgr::WriteDoneCallback>* write_callbacks) { auto file_set_it = io_ctx_to_file_set_map_.find(io_ctx); if (file_set_it != io_ctx_to_file_set_map_.end()) { auto file_it = file_set_it->second.begin(); while (file_it != file_set_it->second.end()) { DCHECK(*file_it != nullptr); // Remove all the ranges belonging to the file, and fetch the callback // functions of the ranges. Status status = MoveWriteRangesHelper(*file_it, write_callbacks, true); DCHECK_OK(status); if (cur_write_range_ != nullptr && *file_it == cur_write_range_->disk_file()) { // Set the current write range to nullptr if the TmpFileGroup is closing to // notify the reservation thread (it is waiting for the reservation) that the // space is no longer needed for the write range. cur_write_range_ = nullptr; } file_it = file_set_it->second.erase(file_it); } io_ctx_to_file_set_map_.erase(file_set_it); } } void TmpFileBufferPool::RemoveWriteRanges(RequestContext* io_ctx) { DCHECK(io_ctx != nullptr); vector<TmpFileMgr::WriteDoneCallback> write_callbacks; { unique_lock<mutex> lock(lock_); RemoveWriteRangesInternal(io_ctx, &write_callbacks); } for (const TmpFileMgr::WriteDoneCallback& write_callback : write_callbacks) { write_callback(TMP_FILE_BUFFER_POOL_CONTEXT_CANCELLED); } } void TmpFileBufferPool::EnqueueTmpFilesPool(shared_ptr<TmpFile>& tmp_file, bool front) { DCHECK(tmp_file != nullptr); { unique_lock<mutex> buffer_lock(tmp_files_avail_pool_lock_); if (tmp_file->disk_type() != io::DiskFileType::DUMMY) { TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file.get()); if (tmp_file_remote->is_enqueued()) return; tmp_file_remote->SetEnqueued(true); tmp_file_remote->SetBufferReturned(); } if (front) { tmp_files_avail_pool_.push_front(tmp_file); } else { tmp_files_avail_pool_.push_back(tmp_file); } tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric()->Increment( -1 * tmp_file_mgr_->GetRemoteTmpFileSize()); } tmp_files_available_cv_.NotifyOne(); } Status TmpFileBufferPool::DequeueTmpFilesPool( shared_ptr<TmpFile>* tmp_file, bool quick_return) { DCHECK(tmp_file != nullptr); DCHECK(dequeue_timer_metric_ != nullptr); ScopedHistogramTimer wait_timer(dequeue_timer_metric_); unique_lock<mutex> buffer_lock(tmp_files_avail_pool_lock_); // If quick return is set and no buffer is available, return immediately. if (quick_return && tmp_files_avail_pool_.empty()) { return TMP_FILE_MGR_NO_AVAILABLE_FILE_TO_EVICT; } while (tmp_files_avail_pool_.empty()) { // Wait if there is no temporary file on the queue. // If timeout, return immediately. if (!tmp_files_available_cv_.WaitFor( buffer_lock, tmp_file_mgr_->GetSpillBufferWaitTimeout())) { return Status(Substitute("Timeout waiting for a local buffer in $0 seconds", tmp_file_mgr_->GetSpillBufferWaitTimeout() / MICROS_PER_SEC)); }; } DCHECK(!tmp_files_avail_pool_.empty()); *tmp_file = tmp_files_avail_pool_.front(); tmp_files_avail_pool_.pop_front(); DCHECK(*tmp_file != nullptr); if ((*tmp_file)->disk_type() != io::DiskFileType::DUMMY) { TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file->get()); // Assert the default size remains the same in case the object is corrupted. DCHECK_EQ(tmp_file_remote->file_size_, tmp_file_mgr_->GetRemoteTmpFileSize()); DCHECK(tmp_file_remote->is_enqueued()); tmp_file_remote->SetEnqueued(false); } tmp_file_mgr_->GetLocalBufferDir()->bytes_used_metric()->Increment( tmp_file_mgr_->GetRemoteTmpFileSize()); return Status::OK(); } } // namespace impala