be/src/runtime/bufferpool/buffer-pool-test.cc (1,840 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <cstdlib> #include <limits> #include <string> #include <vector> #include <boost/algorithm/string/join.hpp> #include <boost/bind.hpp> #include <boost/filesystem.hpp> #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> #include <boost/unordered_map.hpp> #include "codegen/llvm-codegen.h" #include "common/atomic.h" #include "common/init.h" #include "common/object-pool.h" #include "runtime/bufferpool/buffer-allocator.h" #include "runtime/bufferpool/buffer-pool-internal.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" #include "runtime/mem-tracker.h" #include "runtime/query-state.h" #include "runtime/test-env.h" #include "runtime/tmp-file-mgr.h" #include "service/fe-support.h" #include "testutil/cpu-util.h" #include "testutil/death-test-util.h" #include "testutil/gtest-util.h" #include "testutil/rand-util.h" #include "util/blocking-queue.h" #include "util/filesystem-util.h" #include "util/metrics.h" #include "util/spinlock.h" #include "common/names.h" using boost::algorithm::join; using boost::filesystem::directory_iterator; using std::mt19937; using std::uniform_int_distribution; using std::uniform_real_distribution; DECLARE_bool(disk_spill_encryption); DECLARE_string(disk_spill_compression_codec); DECLARE_bool(disk_spill_punch_holes); DECLARE_string(remote_tmp_file_block_size); DECLARE_string(remote_tmp_file_size); // This suffix is appended to a tmp dir const string SCRATCH_SUFFIX = "/impala-scratch"; /// For testing spill to remote. static const string LOCAL_BUFFER_PATH = "/tmp/buffer-pool-test-buffer"; namespace impala { using BufferHandle = BufferPool::BufferHandle; using ClientHandle = BufferPool::ClientHandle; using PageHandle = BufferPool::PageHandle; class BufferPoolTest : public ::testing::Test { public: virtual void SetUp() { test_env_.reset(new TestEnv); FLAGS_remote_tmp_file_size = "512KB"; // Don't create global buffer pool in 'test_env_' - we'll create a buffer pool in // each test function. test_env_->DisableBufferPool(); ASSERT_OK(test_env_->Init()); RandTestUtil::SeedRng("BUFFER_POOL_TEST_SEED", &rng_); remote_url_ = test_env_->GetDefaultFsPath("/tmp"); } virtual void TearDown() { for (auto entry : query_reservations_) { ReservationTracker* tracker = entry.second; tracker->Close(); } for (TmpFileGroup* file_group : file_groups_) { file_group->Close(); } global_reservations_.Close(); obj_pool_.Clear(); // Tests modify permissions, so make sure we can delete if they didn't clean up. for (const string& created_tmp_dir : created_tmp_dirs_) { chmod((created_tmp_dir + SCRATCH_SUFFIX).c_str(), S_IRWXU); } ASSERT_OK(FileSystemUtil::RemovePaths(created_tmp_dirs_)); created_tmp_dirs_.clear(); CpuTestUtil::ResetAffinity(); // Some tests modify affinity. } /// The minimum buffer size used in most tests. const static int64_t TEST_BUFFER_LEN = 1024; /// Test helper to simulate registering then deregistering a number of queries with /// the given initial reservation and reservation limit. 'rng' is used to generate /// any random numbers needed. void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit, mt19937* rng); /// Create and destroy a page multiple times. void CreatePageLoop(BufferPool* pool, TmpFileGroup* file_group, ReservationTracker* parent_tracker, int num_ops); protected: /// Reinitialize test_env_ to have multiple temporary directories. vector<string> InitMultipleTmpDirs(int num_dirs) { return InitTmpFileMgr( num_dirs, FLAGS_disk_spill_compression_codec, FLAGS_disk_spill_punch_holes); } /// Init a new tmp file manager with additional options. vector<string> InitTmpFileMgr( int num_dirs, const string& compression, bool punch_holes) { vector<string> tmp_dirs; for (int i = 0; i < num_dirs; ++i) { const string& dir = Substitute("/tmp/buffer-pool-test.$0", i); // Fix permissions in case old directories were left from previous runs of test. chmod((dir + SCRATCH_SUFFIX).c_str(), S_IRWXU); EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir)); tmp_dirs.push_back(dir); created_tmp_dirs_.push_back(dir); } test_env_.reset(new TestEnv); test_env_->DisableBufferPool(); test_env_->SetTmpFileMgrArgs(tmp_dirs, false, compression, punch_holes); EXPECT_OK(test_env_->Init()); EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices()); return tmp_dirs; } /// Init a new tmp file manager with spill to remote options. vector<string> InitTmpFileMgrSpillToRemote( int num_local_dirs, int64_t local_spill_limit, int64_t local_buffer_limit) { vector<string> tmp_dirs; int expected_tmp_dirs_num = num_local_dirs + 2; int expected_active_dev = num_local_dirs + 1; if (local_buffer_limit == -1) { tmp_dirs.push_back(LOCAL_BUFFER_PATH); } else { tmp_dirs.push_back(Substitute(LOCAL_BUFFER_PATH + ":$0", local_buffer_limit)); } tmp_dirs.push_back(remote_url_); EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(LOCAL_BUFFER_PATH)); for (int i = 0; i < num_local_dirs; ++i) { const string& create_dir = Substitute("/tmp/buffer-pool-test.$0", i); // Fix permissions in case old directories were left from previous runs of test. chmod((create_dir + SCRATCH_SUFFIX).c_str(), S_IRWXU); EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(create_dir)); created_tmp_dirs_.push_back(create_dir); const string& dir = Substitute("/tmp/buffer-pool-test.$0:$1", i, local_spill_limit); tmp_dirs.push_back(dir); } EXPECT_EQ(expected_tmp_dirs_num, tmp_dirs.size()); test_env_.reset(new TestEnv); test_env_->DisableBufferPool(); test_env_->SetTmpFileMgrArgs(tmp_dirs, false, "", false); EXPECT_OK(test_env_->Init()); EXPECT_EQ(expected_active_dev, test_env_->tmp_file_mgr()->NumActiveTmpDevices()); return tmp_dirs; } static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; } /// Helper function to create one reservation tracker per query. ReservationTracker* GetQueryReservationTracker(int64_t query_id) { lock_guard<SpinLock> l(query_reservations_lock_); ReservationTracker* tracker = query_reservations_[query_id]; if (tracker != NULL) return tracker; tracker = obj_pool_.Add(new ReservationTracker()); query_reservations_[query_id] = tracker; return tracker; } RuntimeProfile* NewProfile() { return RuntimeProfile::Create(&obj_pool_, "test profile"); } /// Create a new file group with the default configs. TmpFileGroup* NewFileGroup() { TmpFileGroup* file_group = obj_pool_.Add(new TmpFileGroup(test_env_->tmp_file_mgr(), test_env_->exec_env()->disk_io_mgr(), NewProfile(), TUniqueId())); file_groups_.push_back(file_group); return file_group; } // Helper to check if the page is evicted. bool IsEvicted(BufferPool::PageHandle* page) { lock_guard<SpinLock> pl(page->page_->buffer_lock); return !page->page_->buffer.is_open(); } int NumEvicted(vector<BufferPool::PageHandle>& pages) { int num_evicted = 0; for (PageHandle& page : pages) { if (IsEvicted(&page)) ++num_evicted; } return num_evicted; } /// Allocate buffers of varying sizes at most 'max_buffer_size' that add up to /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size. /// If 'randomize_core' is true, will switch thread between cores randomly before /// each allocation. void AllocateBuffers(BufferPool* pool, BufferPool::ClientHandle* client, int64_t max_buffer_size, int64_t total_bytes, vector<BufferPool::BufferHandle>* buffers, bool randomize_core = false) { int64_t curr_buffer_size = max_buffer_size; int64_t bytes_remaining = total_bytes; while (bytes_remaining > 0) { while (curr_buffer_size > client->GetUnusedReservation()) curr_buffer_size /= 2; if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_); buffers->emplace_back(); ASSERT_OK(pool->AllocateBuffer(client, curr_buffer_size, &buffers->back())); bytes_remaining -= curr_buffer_size; } } /// Do a temporary test allocation. Return the status of AllocateBuffer(). Status AllocateAndFree(BufferPool* pool, ClientHandle* client, int64_t len) { BufferHandle tmp; RETURN_IF_ERROR(pool->AllocateBuffer(client, len, &tmp)); pool->FreeBuffer(client, &tmp); return Status::OK(); } /// Create pages of varying sizes at most 'max_page_size' that add up to /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size. /// If 'randomize_core' is true, will switch thread between cores randomly before /// each allocation. void CreatePages(BufferPool* pool, BufferPool::ClientHandle* client, int64_t max_page_size, int64_t total_bytes, vector<BufferPool::PageHandle>* pages, bool randomize_core = false) { ASSERT_GE(client->GetUnusedReservation(), total_bytes); int64_t curr_page_size = max_page_size; int64_t bytes_remaining = total_bytes; while (bytes_remaining > 0) { while (curr_page_size > client->GetUnusedReservation()) curr_page_size /= 2; pages->emplace_back(); if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_); ASSERT_OK(pool->CreatePage(client, curr_page_size, &pages->back())); bytes_remaining -= curr_page_size; } } /// Free all the 'buffers' and clear the vector. /// If 'randomize_core' is true, will switch thread between cores randomly before /// each free. void FreeBuffers(BufferPool* pool, BufferPool::ClientHandle* client, vector<BufferPool::BufferHandle>* buffers, bool randomize_core = false) { for (auto& buffer : *buffers) { if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_); pool->FreeBuffer(client, &buffer); } buffers->clear(); } Status PinAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages) { for (auto& page : *pages) RETURN_IF_ERROR(pool->Pin(client, &page)); return Status::OK(); } /// Unpin all of 'pages'. If 'delay_between_unpins_ms' > 0, sleep between unpins. void UnpinAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages, int delay_between_unpins_ms = 0) { for (auto& page : *pages) { pool->Unpin(client, &page); if (delay_between_unpins_ms > 0) SleepForMs(delay_between_unpins_ms); } } void DestroyAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages) { for (auto& page : *pages) pool->DestroyPage(client, &page); } /// Write some deterministically-generated sentinel values to pages or buffers. The same /// data is written each time for objects[i], based on start_num + i. template <typename T> void WriteData(const vector<T>& objects, int start_num) { WriteOrVerifyData(objects, start_num, true); } template <typename T> void WriteData(const T& object, int val) { return WriteOrVerifyData(object, val, true); } /// Verify data written by WriteData(). template <typename T> void VerifyData(const vector<T>& objects, int start_num) { WriteOrVerifyData(objects, start_num, false); } template <typename T> void VerifyData(const T& object, int val) { return WriteOrVerifyData(object, val, false); } /// Implemention of WriteData() and VerifyData(). template <typename T> void WriteOrVerifyData(const vector<T>& objects, int start_num, bool write) { for (int i = 0; i < objects.size(); ++i) { WriteOrVerifyData(objects[i], i + start_num, write); } } template <typename T> void WriteOrVerifyData(const T& object, int val, bool write) { // Only write sentinel values to start and end of buffer to make writing and // verification cheap. MemRange mem = GetMemRange(object); uint64_t* start_word = reinterpret_cast<uint64_t*>(mem.data()); uint64_t* end_word = reinterpret_cast<uint64_t*>(&mem.data()[mem.len() - sizeof(uint64_t)]); if (write) { *start_word = val; *end_word = ~val; } else { EXPECT_EQ(*start_word, val); EXPECT_EQ(*end_word, ~val); } } MemRange GetMemRange(const BufferHandle& buffer) { return buffer.mem_range(); } MemRange GetMemRange(const PageHandle& page) { const BufferHandle* buffer; EXPECT_OK(page.GetBuffer(&buffer)); return buffer->mem_range(); } /// Set the maximum number of scavenge attempts that the pool's allocator wil do. void SetMaxScavengeAttempts(BufferPool* pool, int max_attempts) { pool->allocator()->set_max_scavenge_attempts(max_attempts); } void WaitForAllWrites(ClientHandle* client) { client->impl_->WaitForAllWrites(); } // Remove write permissions on scratch files. Return # of scratch files. static int RemoveScratchPerms(const string& scratch_dir) { int num_files = 0; directory_iterator dir_it(scratch_dir); for (; dir_it != directory_iterator(); ++dir_it) { ++num_files; EXPECT_EQ(0, chmod(dir_it->path().c_str(), 0)); } return num_files; } // Remove permissions for the temporary file at 'path' - all subsequent writes // to the file should fail. Expects backing file has already been allocated. static void DisableBackingFile(const string& path) { EXPECT_GT(path.size(), 0); EXPECT_EQ(0, chmod(path.c_str(), 0)); LOG(INFO) << "Injected fault by removing file permissions " << path; } /// Write out a bunch of nonsense to replace the file's current data. static void CorruptBackingFile(const string& path) { EXPECT_GT(path.size(), 0); FILE* file = fopen(path.c_str(), "rb+"); EXPECT_EQ(0, fseek(file, 0, SEEK_END)); int64_t size = ftell(file); EXPECT_EQ(0, fseek(file, 0, SEEK_SET)); for (int64_t i = 0; i < size; ++i) fputc(123, file); fclose(file); LOG(INFO) << "Injected fault by corrupting file " << path; } /// Truncate the file to 0 bytes. static void TruncateBackingFile(const string& path) { EXPECT_GT(path.size(), 0); EXPECT_EQ(0, truncate(path.c_str(), 0)); LOG(INFO) << "Injected fault by truncating file " << path; } // Return whether a pin is in flight for the page. static bool PinInFlight(PageHandle* page) { return page->page_->pin_in_flight.Load(); } // Return the path of the temporary file backing the page. static string TmpFilePath(PageHandle* page) { return page->page_->write_handle->TmpFilePath(); } // Return a comma-separated string with the paths of the temporary file backing the // pages. static string TmpFilePaths(vector<PageHandle>& pages) { vector<string> paths; paths.reserve(pages.size()); for (PageHandle& page : pages) { paths.push_back(TmpFilePath(&page)); } return join(paths, ","); } // Return a string with the name of the directory and a comma-separated list of scratch // files under the specified scratch directory. string DumpScratchDir(const string& tmp_dir_path) { string scratch_dir_path = tmp_dir_path + SCRATCH_SUFFIX; vector<string> entries; EXPECT_OK( FileSystemUtil::Directory::GetEntryNames(scratch_dir_path, &entries)); return "Directory " + scratch_dir_path + ": " + join(entries, ","); } // Check that the file backing the page has dir as a prefix of its path. static bool PageInDir(PageHandle* page, const string& dir) { return TmpFilePath(page).find(dir) == 0; } // Find a page in the list that is backed by a file with the given directory as prefix // of its path. static PageHandle* FindPageInDir(vector<PageHandle>& pages, const string& dir) { for (PageHandle& page : pages) { if (PageInDir(&page, dir)) return &page; } return NULL; } /// Parameterised test implementations. void TestBufferAllocation(bool reserved); void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core); void TestEvictionPolicy(int64_t page_size); void TestCleanPageLimit(int max_clean_pages, bool randomize_core); void TestQueryTeardown(bool write_error); void TestWriteError(int write_delay_ms, const string& compression, bool punch_holes); void TestTmpFileAllocateError(const string& compression, bool punch_holes); void TestWriteErrorBlacklist(const string& compression, bool punch_holes); void TestRandomInternalSingle(int64_t buffer_len, bool multiple_pins); void TestRandomInternalMulti(int num_threads, int64_t buffer_len, bool multiple_pins); static const int SINGLE_THREADED_TID = -1; void TestRandomInternalImpl(BufferPool* pool, TmpFileGroup* file_group, MemTracker* parent_mem_tracker, mt19937* rng, int tid, bool multiple_pins); ObjectPool obj_pool_; ReservationTracker global_reservations_; boost::scoped_ptr<TestEnv> test_env_; /// Per-test random number generator. Seeded before every test. mt19937 rng_; /// The file groups created - closed at end of each test. vector<TmpFileGroup*> file_groups_; /// Paths of temporary directories created during tests - deleted at end of test. vector<string> created_tmp_dirs_; /// Map from query_id to the reservation tracker for that query. Reads and modifications /// of the map are protected by query_reservations_lock_. unordered_map<int64_t, ReservationTracker*> query_reservations_; SpinLock query_reservations_lock_; /// URL for remote spilling. string remote_url_; }; const int64_t BufferPoolTest::TEST_BUFFER_LEN; void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit, mt19937* rng) { Status status; int clients_per_query = 32; BufferPool::ClientHandle* clients[num_queries]; for (int i = 0; i < num_queries; ++i) { int64_t query_id = QueryId(query_id_hi, i); // Initialize a tracker for a new query. ReservationTracker* query_reservation = GetQueryReservationTracker(query_id); query_reservation->InitChildTracker( NULL, &global_reservations_, NULL, query_reservation_limit); // Test that closing then reopening child tracker works. query_reservation->Close(); query_reservation->InitChildTracker( NULL, &global_reservations_, NULL, query_reservation_limit); EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation)); clients[i] = new BufferPool::ClientHandle[clients_per_query]; for (int j = 0; j < clients_per_query; ++j) { int64_t initial_client_reservation = initial_query_reservation / clients_per_query + j < initial_query_reservation % clients_per_query; // Reservation limit can be anything greater or equal to the initial reservation. int64_t client_reservation_limit = initial_client_reservation + (*rng)() % 100000; string name = Substitute("Client $0 for query $1", j, query_id); EXPECT_OK(pool->RegisterClient(name, NULL, query_reservation, NULL, client_reservation_limit, NewProfile(), &clients[i][j])); EXPECT_TRUE(clients[i][j].IncreaseReservationToFit(initial_client_reservation)); } for (int j = 0; j < clients_per_query; ++j) { ASSERT_TRUE(clients[i][j].is_registered()); } } // Deregister clients then query. for (int i = 0; i < num_queries; ++i) { for (int j = 0; j < clients_per_query; ++j) { pool->DeregisterClient(&clients[i][j]); ASSERT_FALSE(clients[i][j].is_registered()); } delete[] clients[i]; GetQueryReservationTracker(QueryId(query_id_hi, i))->Close(); } } /// Test that queries and clients can be registered and deregistered with the reservation /// trackers and the buffer pool. TEST_F(BufferPoolTest, BasicRegistration) { int num_concurrent_queries = 1024; int64_t sum_initial_reservations = 4; int64_t reservation_limit = 1024; // Need enough buffers for all initial reservations. int64_t total_mem = sum_initial_reservations * num_concurrent_queries; global_reservations_.InitRootTracker(NewProfile(), total_mem); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); RegisterQueriesAndClients(&pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit, &rng_); ASSERT_EQ(global_reservations_.GetUsedReservation(), 0); ASSERT_EQ(global_reservations_.GetChildReservations(), 0); ASSERT_EQ(global_reservations_.GetReservation(), 0); global_reservations_.Close(); } /// Test that queries and clients can be registered and deregistered by concurrent /// threads. TEST_F(BufferPoolTest, ConcurrentRegistration) { int queries_per_thread = 64; int num_threads = 64; int num_concurrent_queries = queries_per_thread * num_threads; int64_t sum_initial_reservations = 4; int64_t reservation_limit = 1024; // Need enough buffers for all initial reservations. int64_t total_mem = num_concurrent_queries * sum_initial_reservations; global_reservations_.InitRootTracker(NewProfile(), total_mem); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); vector<mt19937> thread_rngs = RandTestUtil::CreateThreadLocalRngs(num_threads, &rng_); // Launch threads, each with a different set of query IDs. thread_group workers; for (int i = 0; i < num_threads; ++i) { workers.add_thread(new thread(boost::bind(&BufferPoolTest::RegisterQueriesAndClients, this, &pool, i, queries_per_thread, sum_initial_reservations, reservation_limit, &thread_rngs[i]))); } workers.join_all(); // All the reservations should be released at this point. ASSERT_EQ(global_reservations_.GetUsedReservation(), 0); ASSERT_EQ(global_reservations_.GetReservation(), 0); global_reservations_.Close(); } /// Test basic page handle creation. TEST_F(BufferPoolTest, PageCreation) { // Allocate many pages, each a power-of-two multiple of the minimum page length. int num_pages = 16; int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1); int64_t total_mem = 2 * 2 * max_page_len; global_reservations_.InitRootTracker(NULL, total_mem); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, total_mem, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(total_mem)); vector<BufferPool::PageHandle> handles(num_pages); // Create pages of various valid sizes. for (int i = 0; i < num_pages; ++i) { int size_multiple = 1 << i; int64_t page_len = TEST_BUFFER_LEN * size_multiple; int64_t used_before = client.GetUsedReservation(); ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i])); ASSERT_TRUE(handles[i].is_open()); ASSERT_TRUE(handles[i].is_pinned()); const BufferHandle* buffer; ASSERT_OK(handles[i].GetBuffer(&buffer)); ASSERT_TRUE(buffer->data() != NULL); ASSERT_EQ(handles[i].len(), page_len); ASSERT_EQ(buffer->len(), page_len); ASSERT_EQ(client.GetUsedReservation(), used_before + page_len); } // Close the handles and check memory consumption. for (int i = 0; i < num_pages; ++i) { int64_t used_before = client.GetUsedReservation(); int page_len = handles[i].len(); pool.DestroyPage(&client, &handles[i]); ASSERT_EQ(client.GetUsedReservation(), used_before - page_len); } pool.DeregisterClient(&client); // All the reservations should be released at this point. ASSERT_EQ(global_reservations_.GetReservation(), 0); global_reservations_.Close(); } TEST_F(BufferPoolTest, ReservedBufferAllocation) { TestBufferAllocation(true); } TEST_F(BufferPoolTest, UnreservedBufferAllocation) { TestBufferAllocation(false); } void BufferPoolTest::TestBufferAllocation(bool reserved) { // Allocate many buffers, each a power-of-two multiple of the minimum buffer length. const int NUM_BUFFERS = 16; const int64_t MAX_BUFFER_LEN = TEST_BUFFER_LEN << (NUM_BUFFERS - 1); // Total memory required to allocate TEST_BUFFER_LEN, 2*TEST_BUFFER_LEN, ..., // MAX_BUFFER_LEN. const int64_t TOTAL_MEM = 2 * MAX_BUFFER_LEN - TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NULL, TOTAL_MEM); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, TOTAL_MEM, NewProfile(), &client)); if (reserved) { ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM)); } vector<BufferPool::BufferHandle> handles(NUM_BUFFERS); // Create buffers of various valid sizes. int64_t total_allocated = 0; for (int i = 0; i < NUM_BUFFERS; ++i) { int size_multiple = 1 << i; int64_t buffer_len = TEST_BUFFER_LEN * size_multiple; int64_t used_before = client.GetUsedReservation(); if (reserved) { ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i])); } else { // Reservation should be automatically increased. ASSERT_OK(pool.AllocateUnreservedBuffer(&client, buffer_len, &handles[i])); } total_allocated += buffer_len; ASSERT_TRUE(handles[i].is_open()); ASSERT_TRUE(handles[i].data() != NULL); ASSERT_EQ(handles[i].len(), buffer_len); ASSERT_EQ(client.GetUsedReservation(), used_before + buffer_len); // Check that pool-wide values are updated correctly. EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated()); EXPECT_EQ(0, pool.GetNumFreeBuffers()); EXPECT_EQ(0, pool.GetFreeBufferBytes()); } if (!reserved) { // Allocate all of the memory and test the failure path for unreserved allocations. BufferPool::BufferHandle tmp_handle; ASSERT_OK(pool.AllocateUnreservedBuffer(&client, TEST_BUFFER_LEN, &tmp_handle)); ASSERT_FALSE(tmp_handle.is_open()) << "No reservation for buffer"; } // Close the handles and check memory consumption. for (int i = 0; i < NUM_BUFFERS; ++i) { int64_t used_before = client.GetUsedReservation(); int buffer_len = handles[i].len(); pool.FreeBuffer(&client, &handles[i]); ASSERT_EQ(client.GetUsedReservation(), used_before - buffer_len); } pool.DeregisterClient(&client); // All the reservations should be released at this point. ASSERT_EQ(global_reservations_.GetReservation(), 0); // But freed memory is not released to the system immediately. EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated()); EXPECT_EQ(NUM_BUFFERS, pool.GetNumFreeBuffers()); EXPECT_EQ(total_allocated, pool.GetFreeBufferBytes()); global_reservations_.Close(); } // Test that the buffer pool correctly reports the number of clean pages. TEST_F(BufferPoolTest, CleanPageStats) { const int MAX_NUM_BUFFERS = 4; const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); vector<PageHandle> pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); WriteData(pages, 0); EXPECT_FALSE(client.has_unpinned_pages()); // Pages don't start off clean. EXPECT_EQ(0, pool.GetNumCleanPages()); EXPECT_EQ(0, pool.GetCleanPageBytes()); // Unpin pages and wait until they're written out and therefore clean. UnpinAll(&pool, &client, &pages); EXPECT_TRUE(client.has_unpinned_pages()); WaitForAllWrites(&client); EXPECT_EQ(MAX_NUM_BUFFERS, pool.GetNumCleanPages()); EXPECT_EQ(TOTAL_MEM, pool.GetCleanPageBytes()); EXPECT_TRUE(client.has_unpinned_pages()); // Do an allocation to force eviction of one page. ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); EXPECT_EQ(MAX_NUM_BUFFERS - 1, pool.GetNumCleanPages()); EXPECT_EQ(TOTAL_MEM - TEST_BUFFER_LEN, pool.GetCleanPageBytes()); EXPECT_TRUE(client.has_unpinned_pages()); // Re-pin all the pages - none will be clean afterwards. ASSERT_OK(PinAll(&pool, &client, &pages)); VerifyData(pages, 0); EXPECT_EQ(0, pool.GetNumCleanPages()); EXPECT_EQ(0, pool.GetCleanPageBytes()); EXPECT_FALSE(client.has_unpinned_pages()); DestroyAll(&pool, &client, &pages); EXPECT_FALSE(client.has_unpinned_pages()); pool.DeregisterClient(&client); global_reservations_.Close(); } /// Test that the buffer pool respects the clean page limit with all pages in /// the same arena. TEST_F(BufferPoolTest, CleanPageLimitOneArena) { TestCleanPageLimit(0, false); TestCleanPageLimit(2, false); TestCleanPageLimit(4, false); } /// Test that the buffer pool respects the clean page limit with pages spread across /// different arenas. TEST_F(BufferPoolTest, CleanPageLimitRandomArenas) { TestCleanPageLimit(0, true); TestCleanPageLimit(2, true); TestCleanPageLimit(4, true); } void BufferPoolTest::TestCleanPageLimit(int max_clean_pages, bool randomize_core) { const int MAX_NUM_BUFFERS = 4; const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; const int max_clean_page_bytes = max_clean_pages * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); MetricGroup tmp_metrics("test-metrics"); BufferPool pool(&tmp_metrics, TEST_BUFFER_LEN, TOTAL_MEM, max_clean_page_bytes); ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); if (!randomize_core) CpuTestUtil::PinToCore(0); vector<PageHandle> pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages, randomize_core); WriteData(pages, 0); // Unpin pages and wait until they're written out and therefore clean. UnpinAll(&pool, &client, &pages); WaitForAllWrites(&client); EXPECT_EQ(max_clean_pages, pool.GetNumCleanPages()); EXPECT_EQ(max_clean_page_bytes, pool.GetCleanPageBytes()); // Do an allocation to force a buffer to be reclaimed from somewhere. ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); if (randomize_core) { // We will either evict a clean page or reclaim a free buffer, depending on the // arena that we pick. EXPECT_LE(pool.GetNumCleanPages(), max_clean_pages); EXPECT_LE(pool.GetCleanPageBytes(), max_clean_page_bytes); } else { // We will reclaim one of the free buffers in arena 0. EXPECT_EQ(min(MAX_NUM_BUFFERS - 1, max_clean_pages), pool.GetNumCleanPages()); const int64_t expected_clean_page_bytes = min<int64_t>((MAX_NUM_BUFFERS - 1) * TEST_BUFFER_LEN, max_clean_page_bytes); EXPECT_EQ(expected_clean_page_bytes, pool.GetCleanPageBytes()); } // Re-pin all the pages - none will be clean afterwards. ASSERT_OK(PinAll(&pool, &client, &pages)); VerifyData(pages, 0); EXPECT_EQ(0, pool.GetNumCleanPages()); EXPECT_EQ(0, pool.GetCleanPageBytes()); DestroyAll(&pool, &client, &pages); pool.DeregisterClient(&client); global_reservations_.Close(); } /// Test transfer of buffer handles between clients. TEST_F(BufferPoolTest, BufferTransfer) { // Each client needs to have enough reservation for a buffer. const int num_clients = 5; int64_t total_mem = num_clients * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NULL, total_mem); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); BufferPool::ClientHandle clients[num_clients]; BufferPool::BufferHandle handles[num_clients]; for (int i = 0; i < num_clients; ++i) { ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, TEST_BUFFER_LEN, NewProfile(), &clients[i])); ASSERT_TRUE(clients[i].IncreaseReservationToFit(TEST_BUFFER_LEN)); } // Transfer the page around between the clients repeatedly in a circle. ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0])); uint8_t* data = handles[0].data(); for (int iter = 0; iter < 10; ++iter) { for (int client = 0; client < num_clients; ++client) { int next_client = (client + 1) % num_clients; ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client], &clients[next_client], &handles[next_client])); // Check that the transfer left things in a consistent state. ASSERT_FALSE(handles[client].is_open()); ASSERT_EQ(0, clients[client].GetUsedReservation()); ASSERT_TRUE(handles[next_client].is_open()); ASSERT_EQ(TEST_BUFFER_LEN, clients[next_client].GetUsedReservation()); // The same underlying buffer should be used. ASSERT_EQ(data, handles[next_client].data()); } } pool.FreeBuffer(&clients[0], &handles[0]); for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client); ASSERT_EQ(global_reservations_.GetReservation(), 0); global_reservations_.Close(); } TEST_F(BufferPoolTest, BufferTransferConcurrent) { // Transfer buffers between threads in a circular fashion. Each client needs to have // enough reservation for two buffers, since it may receive a buffer before handing // off the next one. const int NUM_CLIENTS = 5; const int64_t TOTAL_MEM = NUM_CLIENTS * TEST_BUFFER_LEN * 2; global_reservations_.InitRootTracker(NULL, TOTAL_MEM); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); BufferPool::ClientHandle clients[NUM_CLIENTS]; BufferPool::BufferHandle handles[NUM_CLIENTS]; SpinLock locks[NUM_CLIENTS]; // Each lock protects the corresponding BufferHandle. for (int i = 0; i < NUM_CLIENTS; ++i) { ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, TOTAL_MEM, NewProfile(), &clients[i])); ASSERT_TRUE(clients[i].IncreaseReservationToFit(2 * TEST_BUFFER_LEN)); } thread_group workers; for (int thread_idx = 0; thread_idx < NUM_CLIENTS; ++thread_idx) { workers.add_thread(new thread([&pool, &clients, &handles, &locks, thread_idx] { // Transfer buffers around between the clients repeatedly in a circle. BufferHandle handle; { lock_guard<SpinLock> l(locks[thread_idx]); LOG(INFO) << "Allocate from " << (void*)&clients[thread_idx]; ASSERT_OK(pool.AllocateBuffer( &clients[thread_idx], TEST_BUFFER_LEN, &handle)); } for (int iter = 0; iter < 100; ++iter) { int next_thread_idx = (thread_idx + 1) % NUM_CLIENTS; // Transfer our buffer to the next thread. { unique_lock<SpinLock> l(locks[next_thread_idx]); // Spin until we can add the handle. while (true) { if (!handles[next_thread_idx].is_open()) break; l.unlock(); sched_yield(); l.lock(); } ASSERT_TRUE(handle.is_open()); ASSERT_OK(pool.TransferBuffer(&clients[thread_idx], &handle, &clients[next_thread_idx], &handles[next_thread_idx])); // Check that the transfer left things in a consistent state. ASSERT_TRUE(handles[next_thread_idx].is_open()); ASSERT_FALSE(handle.is_open()); ASSERT_GE(clients[next_thread_idx].GetUsedReservation(), TEST_BUFFER_LEN); } // Get a new buffer from the previous thread. { unique_lock<SpinLock> l(locks[thread_idx]); // Spin until we receive a handle from the previous thread. while (true) { if (handles[thread_idx].is_open()) break; l.unlock(); sched_yield(); l.lock(); } handle = move(handles[thread_idx]); } } pool.FreeBuffer(&clients[thread_idx], &handle); })); } workers.join_all(); for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client); ASSERT_EQ(global_reservations_.GetReservation(), 0); global_reservations_.Close(); } /// Test basic pinning and unpinning. TEST_F(BufferPoolTest, Pin) { int64_t total_mem = TEST_BUFFER_LEN * 1024; // Set up client with enough reservation to pin twice. int64_t child_reservation = TEST_BUFFER_LEN * 2; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); global_reservations_.InitRootTracker(NULL, total_mem); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, NULL, child_reservation, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation)); BufferPool::PageHandle handle1, handle2; // Can pin two minimum sized pages. const BufferHandle* page_buffer; ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1, &page_buffer)); ASSERT_TRUE(handle1.is_open()); ASSERT_TRUE(handle1.is_pinned()); ASSERT_TRUE(page_buffer->data() != NULL); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2, &page_buffer)); ASSERT_TRUE(handle2.is_open()); ASSERT_TRUE(handle2.is_pinned()); ASSERT_TRUE(page_buffer->data() != NULL); pool.Unpin(&client, &handle2); ASSERT_FALSE(handle2.is_pinned()); // Can pin minimum-sized page twice. ASSERT_OK(pool.Pin(&client, &handle1)); ASSERT_TRUE(handle1.is_pinned()); // Have to unpin twice. pool.Unpin(&client, &handle1); ASSERT_TRUE(handle1.is_pinned()); pool.Unpin(&client, &handle1); ASSERT_FALSE(handle1.is_pinned()); // Can pin double-sized page only once. BufferPool::PageHandle double_handle; ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle, &page_buffer)); ASSERT_TRUE(double_handle.is_open()); ASSERT_TRUE(double_handle.is_pinned()); ASSERT_TRUE(page_buffer->data() != NULL); // Destroy the pages - test destroying both pinned and unpinned. pool.DestroyPage(&client, &handle1); pool.DestroyPage(&client, &handle2); pool.DestroyPage(&client, &double_handle); pool.DeregisterClient(&client); } // Test the various state transitions possible with async Pin() calls. TEST_F(BufferPoolTest, AsyncPin) { const int DATA_SEED = 1234; // Set up pool with enough reservation to keep two buffers in memory. const int64_t TOTAL_MEM = 2 * TEST_BUFFER_LEN; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NULL, TOTAL_MEM); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, NULL, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM)); PageHandle handle; const BufferHandle* buffer; ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle, &buffer)); WriteData(*buffer, DATA_SEED); // Pin() on a pinned page just increments the pin count. ASSERT_OK(pool.Pin(&client, &handle)); EXPECT_EQ(2, handle.pin_count()); EXPECT_FALSE(PinInFlight(&handle)); pool.Unpin(&client, &handle); pool.Unpin(&client, &handle); ASSERT_FALSE(handle.is_pinned()); // Calling Pin() then Pin() results in double-pinning. ASSERT_OK(pool.Pin(&client, &handle)); ASSERT_OK(pool.Pin(&client, &handle)); EXPECT_EQ(2, handle.pin_count()); EXPECT_FALSE(PinInFlight(&handle)); pool.Unpin(&client, &handle); pool.Unpin(&client, &handle); ASSERT_FALSE(handle.is_pinned()); // Pin() on a page that isn't evicted pins it immediately. ASSERT_OK(pool.Pin(&client, &handle)); EXPECT_EQ(1, handle.pin_count()); EXPECT_FALSE(PinInFlight(&handle)); VerifyData(handle, 1234); pool.Unpin(&client, &handle); ASSERT_FALSE(handle.is_pinned()); // Force eviction. Pin() on an evicted page starts the write asynchronously. ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM)); ASSERT_OK(pool.Pin(&client, &handle)); EXPECT_EQ(1, handle.pin_count()); EXPECT_TRUE(PinInFlight(&handle)); // Block on the pin and verify the buffer. ASSERT_OK(handle.GetBuffer(&buffer)); EXPECT_FALSE(PinInFlight(&handle)); VerifyData(*buffer, 1234); // Test that we can unpin while in flight and the data remains valid. pool.Unpin(&client, &handle); ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM)); ASSERT_OK(pool.Pin(&client, &handle)); EXPECT_TRUE(PinInFlight(&handle)); pool.Unpin(&client, &handle); ASSERT_OK(pool.Pin(&client, &handle)); ASSERT_OK(handle.GetBuffer(&buffer)); VerifyData(*buffer, 1234); // Evict the page, then destroy while we're pinning it asynchronously. pool.Unpin(&client, &handle); ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM)); ASSERT_OK(pool.Pin(&client, &handle)); pool.DestroyPage(&client, &handle); pool.DeregisterClient(&client); } /// Creating a page or pinning without sufficient reservation should DCHECK. TEST_F(BufferPoolTest, PinWithoutReservation) { int64_t total_mem = TEST_BUFFER_LEN * 1024; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); global_reservations_.InitRootTracker(NULL, total_mem); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, TEST_BUFFER_LEN, NewProfile(), &client)); BufferPool::PageHandle handle; IMPALA_ASSERT_DEBUG_DEATH( discard_result(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)), ""); // Should succeed after increasing reservation. ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)); // But we can't pin again. IMPALA_ASSERT_DEBUG_DEATH(discard_result(pool.Pin(&client, &handle)), ""); pool.DestroyPage(&client, &handle); pool.DeregisterClient(&client); } TEST_F(BufferPoolTest, ExtractBuffer) { int64_t total_mem = TEST_BUFFER_LEN * 1024; // Set up client with enough reservation for two buffers/pins. int64_t child_reservation = TEST_BUFFER_LEN * 2; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); global_reservations_.InitRootTracker(NULL, total_mem); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, NULL, child_reservation, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation)); BufferPool::PageHandle page; BufferPool::BufferHandle buffer; // Test basic buffer extraction. for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) { const BufferHandle* page_buffer; ASSERT_OK(pool.CreatePage(&client, len, &page, &page_buffer)); uint8_t* page_data = page_buffer->data(); ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer)); ASSERT_FALSE(page.is_open()); ASSERT_TRUE(buffer.is_open()); ASSERT_EQ(len, buffer.len()); ASSERT_EQ(page_data, buffer.data()); ASSERT_EQ(len, client.GetUsedReservation()); pool.FreeBuffer(&client, &buffer); ASSERT_EQ(0, client.GetUsedReservation()); } // Test that ExtractBuffer() accounts correctly for pin count > 1. const BufferHandle* page_buffer; ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page, &page_buffer)); uint8_t* page_data = page_buffer->data(); ASSERT_OK(pool.Pin(&client, &page)); ASSERT_EQ(TEST_BUFFER_LEN * 2, client.GetUsedReservation()); ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer)); ASSERT_EQ(TEST_BUFFER_LEN, client.GetUsedReservation()); ASSERT_FALSE(page.is_open()); ASSERT_TRUE(buffer.is_open()); ASSERT_EQ(TEST_BUFFER_LEN, buffer.len()); ASSERT_EQ(page_data, buffer.data()); pool.FreeBuffer(&client, &buffer); ASSERT_EQ(0, client.GetUsedReservation()); // Test that ExtractBuffer() DCHECKs for unpinned pages. ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); pool.Unpin(&client, &page); IMPALA_ASSERT_DEBUG_DEATH( discard_result(pool.ExtractBuffer(&client, &page, &buffer)), ""); pool.DestroyPage(&client, &page); pool.DeregisterClient(&client); } // Test concurrent creation and destruction of pages. TEST_F(BufferPoolTest, ConcurrentPageCreation) { int ops_per_thread = 1024; // int num_threads = 64; int num_threads = 1; // Need enough buffers for all initial reservations. int total_mem = num_threads * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NULL, total_mem); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); // Share a file group between the threads. TmpFileGroup* file_group = NewFileGroup(); // Launch threads, each with a different set of query IDs. thread_group workers; for (int i = 0; i < num_threads; ++i) { workers.add_thread(new thread(boost::bind(&BufferPoolTest::CreatePageLoop, this, &pool, file_group, &global_reservations_, ops_per_thread))); } // Build debug string to test concurrent iteration over pages_ list. for (int i = 0; i < 64; ++i) { LOG(INFO) << pool.DebugString(); } workers.join_all(); // All the reservations should be released at this point. ASSERT_EQ(global_reservations_.GetChildReservations(), 0); global_reservations_.Close(); } void BufferPoolTest::CreatePageLoop(BufferPool* pool, TmpFileGroup* file_group, ReservationTracker* parent_tracker, int num_ops) { BufferPool::ClientHandle client; ASSERT_OK(pool->RegisterClient("test client", file_group, parent_tracker, NULL, TEST_BUFFER_LEN, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN)); for (int i = 0; i < num_ops; ++i) { BufferPool::PageHandle handle; ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle)); pool->Unpin(&client, &handle); ASSERT_OK(pool->Pin(&client, &handle)); pool->DestroyPage(&client, &handle); } pool->DeregisterClient(&client); } /// Test that DCHECK fires when trying to unpin a page with spilling disabled. TEST_F(BufferPoolTest, SpillingDisabledDcheck) { global_reservations_.InitRootTracker(NULL, 2 * TEST_BUFFER_LEN); BufferPool pool( test_env_->metrics(), TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN); BufferPool::PageHandle handle; BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, numeric_limits<int64_t>::max(), NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)); ASSERT_OK(pool.Pin(&client, &handle)); // It's ok to Unpin() if the pin count remains positive. pool.Unpin(&client, &handle); // We didn't pass in a TmpFileGroup, so spilling is disabled and we can't bring the // pin count to 0. IMPALA_ASSERT_DEBUG_DEATH(pool.Unpin(&client, &handle), ""); pool.DestroyPage(&client, &handle); pool.DeregisterClient(&client); } /// Test simple case where pool must evict a page from the same client to fit another. TEST_F(BufferPoolTest, EvictPageSameClient) { global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN); BufferPool pool( test_env_->metrics(), TEST_BUFFER_LEN, TEST_BUFFER_LEN, TEST_BUFFER_LEN); BufferPool::PageHandle handle1, handle2; BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, NULL, TEST_BUFFER_LEN, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); // Do not have enough reservations because we pinned the page. IMPALA_ASSERT_DEBUG_DEATH( discard_result(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)), ""); // We should be able to create a new page after unpinned and evicting the first one. pool.Unpin(&client, &handle1); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); pool.DestroyPage(&client, &handle1); pool.DestroyPage(&client, &handle2); pool.DeregisterClient(&client); } /// Test simple case where pool must evict pages of different sizes. TEST_F(BufferPoolTest, EvictPageDifferentSizes) { const int64_t TOTAL_BYTES = 2 * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NULL, TOTAL_BYTES); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES); BufferPool::PageHandle handle1, handle2; BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, NULL, TOTAL_BYTES, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); pool.Unpin(&client, &handle1); // We must evict the small page to fit the large one. ASSERT_OK(pool.CreatePage(&client, 2 * TEST_BUFFER_LEN, &handle2)); ASSERT_TRUE(IsEvicted(&handle1)); // We must evict the large page to fit the small one. pool.Unpin(&client, &handle2); ASSERT_OK(pool.Pin(&client, &handle1)); ASSERT_TRUE(IsEvicted(&handle2)); pool.DestroyPage(&client, &handle1); pool.DestroyPage(&client, &handle2); pool.DeregisterClient(&client); } /// Test simple case where pool must evict a page from a one client to fit another one in /// memory. TEST_F(BufferPoolTest, EvictPageDifferentClient) { const int NUM_CLIENTS = 2; const int64_t TOTAL_BYTES = NUM_CLIENTS * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NULL, TOTAL_BYTES); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES); BufferPool::ClientHandle clients[NUM_CLIENTS]; for (int i = 0; i < NUM_CLIENTS; ++i) { ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i), NewFileGroup(), &global_reservations_, NULL, TEST_BUFFER_LEN, NewProfile(), &clients[i])); ASSERT_TRUE(clients[i].IncreaseReservation(TEST_BUFFER_LEN)); } // Create a pinned and unpinned page for the first client. PageHandle handle1, handle2; const BufferHandle* page_buffer; ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1, &page_buffer)); const uint8_t TEST_VAL = 123; memset( page_buffer->data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value. pool.Unpin(&clients[0], &handle1); ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle2)); // Allocating a buffer for the second client requires evicting the unpinned page. BufferHandle buffer; ASSERT_OK(pool.AllocateBuffer(&clients[1], TEST_BUFFER_LEN, &buffer)); ASSERT_TRUE(IsEvicted(&handle1)); // Test reading back the first page, which requires swapping buffers again. pool.Unpin(&clients[0], &handle2); ASSERT_OK(pool.Pin(&clients[0], &handle1)); ASSERT_TRUE(IsEvicted(&handle2)); ASSERT_OK(handle1.GetBuffer(&page_buffer)); for (int i = 0; i < handle1.len(); ++i) { EXPECT_EQ(TEST_VAL, page_buffer->data()[i]) << i; } // Clean up everything. pool.DestroyPage(&clients[0], &handle1); pool.DestroyPage(&clients[0], &handle2); pool.FreeBuffer(&clients[1], &buffer); for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client); } /// Regression test for IMPALA-5113 where the page flushing invariant didn't correctly /// take multiply pinned pages into account. TEST_F(BufferPoolTest, MultiplyPinnedPageAccounting) { const int NUM_BUFFERS = 3; const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NULL, TOTAL_BYTES); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES); BufferPool::ClientHandle client; RuntimeProfile* profile = NewProfile(); ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, NULL, TOTAL_BYTES, profile, &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_BYTES)); BufferPool::PageHandle handle1, handle2; BufferPool::BufferHandle buffer; ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1)); ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)); pool.Unpin(&client, &handle1); ASSERT_OK(pool.Pin(&client, &handle2)); ASSERT_OK(pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &buffer)); // We shouldn't need to flush anything to disk since we have only three pages/buffers in // memory. Rely on DCHECKs to check invariants and check we didn't evict the page. EXPECT_FALSE(IsEvicted(&handle1)) << handle1.DebugString(); pool.DestroyPage(&client, &handle1); pool.DestroyPage(&client, &handle2); pool.FreeBuffer(&client, &buffer); pool.DeregisterClient(&client); } // Constants for TestMemoryReclamation(). const int MEM_RECLAMATION_NUM_CLIENTS = 2; // Choose a non-power-of two so that AllocateBuffers() will allocate a mix of sizes: // 32 + 32 + 32 + 8 + 4 + 2 + 1 const int64_t MEM_RECLAMATION_BUFFERS_PER_CLIENT = 127; const int64_t MEM_RECLAMATION_CLIENT_RESERVATION = BufferPoolTest::TEST_BUFFER_LEN * MEM_RECLAMATION_BUFFERS_PER_CLIENT; const int64_t MEM_RECLAMATION_TOTAL_BYTES = MEM_RECLAMATION_NUM_CLIENTS * MEM_RECLAMATION_CLIENT_RESERVATION; // Test that we can reclaim buffers and pages from the same arena and from other arenas. TEST_F(BufferPoolTest, MemoryReclamation) { global_reservations_.InitRootTracker(NULL, MEM_RECLAMATION_TOTAL_BYTES); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, MEM_RECLAMATION_TOTAL_BYTES, MEM_RECLAMATION_TOTAL_BYTES); // Assume that all cores are online. Test various combinations of cores to validate // that it can reclaim from any other other core. for (int src = 0; src < CpuInfo::num_cores(); ++src) { // Limit the max scavenge attempts to force use of the "locked" scavenging sometimes, // which would otherwise only be triggered by racing threads. SetMaxScavengeAttempts(&pool, 1 + src % 3); for (int j = 0; j < 4; ++j) { int dst = (src + j) % CpuInfo::num_cores(); TestMemoryReclamation(&pool, src, dst); } // Test with one fixed and the other randomly changing TestMemoryReclamation(&pool, src, -1); TestMemoryReclamation(&pool, -1, src); } // Test with both src and dst randomly changing. TestMemoryReclamation(&pool, -1, -1); global_reservations_.Close(); } // Test that we can reclaim buffers and pages from the same arena or a different arena. // Allocates then frees memory on 'src_core' then allocates on 'dst_core' to force // reclamation of memory from src_core's free buffer lists and clean page lists. // If 'src_core' or 'dst_core' is -1, randomly switch between cores instead of sticking // to a fixed core. void BufferPoolTest::TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core) { LOG(INFO) << "TestMemoryReclamation " << src_core << " -> " << dst_core; const bool rand_src_core = src_core == -1; const bool rand_dst_core = dst_core == -1; BufferPool::ClientHandle clients[MEM_RECLAMATION_NUM_CLIENTS]; for (int i = 0; i < MEM_RECLAMATION_NUM_CLIENTS; ++i) { ASSERT_OK(pool->RegisterClient(Substitute("test client $0", i), NewFileGroup(), &global_reservations_, NULL, MEM_RECLAMATION_CLIENT_RESERVATION, NewProfile(), &clients[i])); ASSERT_TRUE(clients[i].IncreaseReservation(MEM_RECLAMATION_CLIENT_RESERVATION)); } // Allocate and free the whole pool's buffers on src_core to populate its free lists. if (!rand_src_core) CpuTestUtil::PinToCore(src_core); vector<BufferPool::BufferHandle> client_buffers[MEM_RECLAMATION_NUM_CLIENTS]; AllocateBuffers(pool, &clients[0], 32 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_src_core); AllocateBuffers(pool, &clients[1], 32 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[1], rand_src_core); FreeBuffers(pool, &clients[0], &client_buffers[0], rand_src_core); FreeBuffers(pool, &clients[1], &client_buffers[1], rand_src_core); // Allocate buffers again on dst_core. Make sure the size is bigger, smaller, and the // same size as buffers we allocated earlier to we exercise different code paths. if (!rand_dst_core) CpuTestUtil::PinToCore(dst_core); AllocateBuffers(pool, &clients[0], 4 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_dst_core); FreeBuffers(pool, &clients[0], &client_buffers[0], rand_dst_core); // Allocate and unpin the whole pool's buffers as clean pages on src_core to populate // its clean page lists. if (!rand_src_core) CpuTestUtil::PinToCore(src_core); vector<BufferPool::PageHandle> client_pages[MEM_RECLAMATION_NUM_CLIENTS]; CreatePages(pool, &clients[0], 32 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION, &client_pages[0], rand_src_core); CreatePages(pool, &clients[1], 32 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION, &client_pages[1], rand_src_core); for (auto& page : client_pages[0]) pool->Unpin(&clients[0], &page); for (auto& page : client_pages[1]) pool->Unpin(&clients[1], &page); // Allocate the buffers again to force reclamation of the buffers from the clean pages. if (!rand_dst_core) CpuTestUtil::PinToCore(dst_core); AllocateBuffers(pool, &clients[0], 4 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_dst_core); FreeBuffers(pool, &clients[0], &client_buffers[0]); // Just for good measure, pin the pages again then destroy them. for (auto& page : client_pages[0]) { ASSERT_OK(pool->Pin(&clients[0], &page)); pool->DestroyPage(&clients[0], &page); } for (auto& page : client_pages[1]) { ASSERT_OK(pool->Pin(&clients[1], &page)); pool->DestroyPage(&clients[1], &page); } for (BufferPool::ClientHandle& client : clients) pool->DeregisterClient(&client); } // Test the eviction policy of the buffer pool. Writes are issued eagerly as pages // are unpinned, but pages are only evicted from memory when another buffer is // allocated. TEST_F(BufferPoolTest, EvictionPolicy) { TestEvictionPolicy(TEST_BUFFER_LEN); TestEvictionPolicy(2 * 1024 * 1024); } void BufferPoolTest::TestEvictionPolicy(int64_t page_size) { // The eviction policy changes if there are multiple NUMA nodes, because buffers from // clean pages on the local node are claimed in preference to free buffers on the // non-local node. The rest of the test assumes that it executes on a single NUMA node. if (CpuInfo::GetMaxNumNumaNodes() > 1) CpuTestUtil::PinToCore(0); const int MAX_NUM_BUFFERS = 5; int64_t total_mem = MAX_NUM_BUFFERS * page_size; global_reservations_.InitRootTracker(NewProfile(), total_mem); MetricGroup tmp_metrics("test-metrics"); BufferPool pool(&tmp_metrics, TEST_BUFFER_LEN, total_mem, total_mem); ClientHandle client; RuntimeProfile* profile = NewProfile(); ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, total_mem, profile, &client)); ASSERT_TRUE(client.IncreaseReservation(total_mem)); RuntimeProfileBase* buffer_pool_profile = nullptr; vector<RuntimeProfileBase*> profile_children; profile->GetChildren(&profile_children); for (RuntimeProfileBase* child : profile_children) { if (child->name() == "Buffer pool") { buffer_pool_profile = child; break; } } ASSERT_TRUE(buffer_pool_profile != nullptr); RuntimeProfile::Counter* cumulative_bytes_alloced = buffer_pool_profile->GetCounter("CumulativeAllocationBytes"); RuntimeProfile::Counter* write_ios = buffer_pool_profile->GetCounter("WriteIoOps"); RuntimeProfile::Counter* read_ios = buffer_pool_profile->GetCounter("ReadIoOps"); vector<PageHandle> pages; CreatePages(&pool, &client, page_size, total_mem, &pages); WriteData(pages, 0); // Unpin pages. Writes should be started and memory should not be deallocated. EXPECT_EQ(total_mem, cumulative_bytes_alloced->value()); EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated()); UnpinAll(&pool, &client, &pages); ASSERT_GT(write_ios->value(), 0); // Re-pin all the pages and validate their data. This should not require reading the // pages back from disk. ASSERT_OK(PinAll(&pool, &client, &pages)); ASSERT_EQ(0, read_ios->value()); VerifyData(pages, 0); // Unpin all pages. Writes should be started again. int64_t prev_write_ios = write_ios->value(); UnpinAll(&pool, &client, &pages); ASSERT_GT(write_ios->value(), prev_write_ios); // Allocate two more buffers. Two unpinned pages must be evicted to make room. const int NUM_EXTRA_BUFFERS = 2; vector<BufferHandle> extra_buffers; AllocateBuffers( &pool, &client, page_size, page_size * NUM_EXTRA_BUFFERS, &extra_buffers); // At least two unpinned pages should have been written out. ASSERT_GE(write_ios->value(), prev_write_ios + NUM_EXTRA_BUFFERS); // No additional memory should have been allocated - it should have been recycled. EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated()); // Check that two pages were evicted. EXPECT_EQ(NUM_EXTRA_BUFFERS, NumEvicted(pages)); // Free up memory required to pin the original pages again. FreeBuffers(&pool, &client, &extra_buffers); ASSERT_OK(PinAll(&pool, &client, &pages)); // We only needed read to back the two evicted pages. Make sure we didn't do extra I/O. ASSERT_EQ(NUM_EXTRA_BUFFERS, read_ios->value()); VerifyData(pages, 0); DestroyAll(&pool, &client, &pages); pool.DeregisterClient(&client); global_reservations_.Close(); } /// Test that we can destroy pages while a disk write is in flight for those pages. TEST_F(BufferPoolTest, DestroyDuringWrite) { const int TRIALS = 20; const int MAX_NUM_BUFFERS = 5; const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); ClientHandle client; for (int trial = 0; trial < TRIALS; ++trial) { ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); vector<PageHandle> pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); // Unpin will initiate writes. UnpinAll(&pool, &client, &pages); // Writes should still be in flight when pages are deleted. DestroyAll(&pool, &client, &pages); pool.DeregisterClient(&client); } } /// Test teardown of a query while writes are in flight. This was based on a /// BufferedBlockMgr regression test for IMPALA-2252 where tear-down of the /// query's RuntimeStates raced with scratch writes. If write_error is true, /// force writes to hit errors. void BufferPoolTest::TestQueryTeardown(bool write_error) { const int64_t TOTAL_BUFFERS = 20; const int CLIENT_BUFFERS = 10; const int64_t TOTAL_MEM = TOTAL_BUFFERS * TEST_BUFFER_LEN; const int64_t CLIENT_MEM = CLIENT_BUFFERS * TEST_BUFFER_LEN; // Set up a BufferPool in the TestEnv. test_env_.reset(new TestEnv()); test_env_->SetBufferPoolArgs(TEST_BUFFER_LEN, TOTAL_MEM); ASSERT_OK(test_env_->Init()); BufferPool* pool = test_env_->exec_env()->buffer_pool(); RuntimeState* state; ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &state)); ClientHandle client; ASSERT_OK(pool->RegisterClient("test client", state->query_state()->file_group(), state->instance_buffer_reservation(), obj_pool_.Add(new MemTracker(-1, "", state->instance_mem_tracker())), CLIENT_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(CLIENT_MEM)); vector<PageHandle> pages; CreatePages(pool, &client, TEST_BUFFER_LEN, CLIENT_BUFFERS, &pages); if (write_error) { UnpinAll(pool, &client, &pages); // Allocate more buffers to create memory pressure and force eviction of all the // unpinned pages. vector<BufferHandle> tmp_buffers; AllocateBuffers(pool, &client, TEST_BUFFER_LEN, CLIENT_BUFFERS, &tmp_buffers); string tmp_file_path = TmpFilePath(pages.data()); FreeBuffers(pool, &client, &tmp_buffers); ASSERT_OK(PinAll(pool, &client, &pages)); // Remove temporary file to force future writes to that file to fail. DisableBackingFile(tmp_file_path); } // Unpin will initiate writes. If we triggered a write error earlier, some writes may // go down the error path. UnpinAll(pool, &client, &pages); // Tear down the pages, client, and query in the correct order while writes are in // flight. DestroyAll(pool, &client, &pages); pool->DeregisterClient(&client); test_env_->TearDownQueries(); // All memory should be released from the query. EXPECT_EQ(0, test_env_->TotalQueryMemoryConsumption()); EXPECT_EQ(0, test_env_->exec_env()->buffer_reservation()->GetChildReservations()); } TEST_F(BufferPoolTest, QueryTeardown) { TestQueryTeardown(false); } TEST_F(BufferPoolTest, QueryTeardownWriteError) { TestQueryTeardown(true); } // Test that the buffer pool handles a write error correctly. Delete the scratch // directory before an operation that would cause a write and test that subsequent API // calls return errors as expected. void BufferPoolTest::TestWriteError( int write_delay_ms, const string& compression, bool punch_holes) { InitTmpFileMgr(1, compression, punch_holes); int MAX_NUM_BUFFERS = 2; int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); client.impl_->set_debug_write_delay_ms(write_delay_ms); vector<PageHandle> pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, MAX_NUM_BUFFERS, &pages); // Unpin two pages here, to ensure that backing storage is allocated in tmp file. UnpinAll(&pool, &client, &pages); WaitForAllWrites(&client); // Repin the pages ASSERT_OK(PinAll(&pool, &client, &pages)); // Remove permissions to the backing storage so that future writes will fail ASSERT_GT(RemoveScratchPerms(test_env_->tmp_file_mgr()->GetTmpDirPath(0)), 0); // Give the first write a chance to fail before the second write starts. const int INTERVAL_MS = 10; UnpinAll(&pool, &client, &pages, INTERVAL_MS); WaitForAllWrites(&client); // Subsequent calls to APIs that require allocating memory should fail: the write error // is picked up asynchronously. BufferHandle tmp_buffer; PageHandle tmp_page; Status error = pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &tmp_buffer); EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code()); ASSERT_NE(string::npos, error.msg().msg().find(GetBackendString())); EXPECT_FALSE(tmp_buffer.is_open()); error = pool.CreatePage(&client, TEST_BUFFER_LEN, &tmp_page); EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code()); EXPECT_FALSE(tmp_page.is_open()); error = pool.Pin(&client, pages.data()); EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code()); EXPECT_FALSE(pages[0].is_pinned()); // Transferring reservation does not result in an error. bool transferred; EXPECT_OK( client.TransferReservationTo(&global_reservations_, TEST_BUFFER_LEN, &transferred)); EXPECT_TRUE(transferred); DestroyAll(&pool, &client, &pages); pool.DeregisterClient(&client); global_reservations_.Close(); } TEST_F(BufferPoolTest, WriteError) { TestWriteError(0, "", false); } TEST_F(BufferPoolTest, WriteErrorCompression) { TestWriteError(0, "snappy", true); } // Regression test for IMPALA-4842 - inject a delay in the write to // reproduce the issue. TEST_F(BufferPoolTest, WriteErrorWriteDelay) { TestWriteError(100, "", false); } TEST_F(BufferPoolTest, WriteErrorDelayCompression) { TestWriteError(100, "gzip", true); } // Test error handling when temporary file space cannot be allocated to back an unpinned // page. TEST_F(BufferPoolTest, TmpFileAllocateError) { TestTmpFileAllocateError("", false); } TEST_F(BufferPoolTest, TmpFileAllocateErrorCompression) { TestTmpFileAllocateError("lz4", true); } void BufferPoolTest::TestTmpFileAllocateError( const string& compression, bool punch_holes) { const int MAX_NUM_BUFFERS = 2; const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); vector<PageHandle> pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); // Unpin a page, which will trigger a write. pool.Unpin(&client, pages.data()); WaitForAllWrites(&client); // Remove permissions to the temporary files - subsequent operations will fail. ASSERT_GT(RemoveScratchPerms(test_env_->tmp_file_mgr()->GetTmpDirPath(0)), 0); // The write error will happen asynchronously. pool.Unpin(&client, &pages[1]); // Write failure causes future operations like Pin() to fail. WaitForAllWrites(&client); Status error = pool.Pin(&client, pages.data()); EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code()); EXPECT_FALSE(pages[0].is_pinned()); DestroyAll(&pool, &client, &pages); pool.DeregisterClient(&client); } TEST_F(BufferPoolTest, WriteErrorBlacklist) { TestWriteErrorBlacklist("", false); } TEST_F(BufferPoolTest, WriteErrorBlacklistHolepunch) { TestWriteErrorBlacklist("", true); } TEST_F(BufferPoolTest, WriteErrorBlacklistCompression) { TestWriteErrorBlacklist("lz4", true); } // Test that scratch devices are blacklisted after a write error. The query that // encountered the write error should not allocate more pages on that device, but // existing pages on the device will remain in use and future queries will use the device. void BufferPoolTest::TestWriteErrorBlacklist( const string& compression, bool punch_holes) { // Set up two file groups with two temporary dirs. vector<string> tmp_dirs = InitTmpFileMgr(2, compression, punch_holes); // Simulate two concurrent queries. const int TOTAL_QUERIES = 3; const int INITIAL_QUERIES = 2; const int MAX_NUM_PAGES = 6; const int PAGES_PER_QUERY = MAX_NUM_PAGES / TOTAL_QUERIES; const int64_t TOTAL_MEM = MAX_NUM_PAGES * TEST_BUFFER_LEN; const int64_t MEM_PER_QUERY = PAGES_PER_QUERY * TEST_BUFFER_LEN; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); vector<TmpFileGroup*> file_groups; vector<ClientHandle> clients(TOTAL_QUERIES); for (int i = 0; i < INITIAL_QUERIES; ++i) { file_groups.push_back(NewFileGroup()); ASSERT_OK(pool.RegisterClient("test client", file_groups[i], &global_reservations_, nullptr, MEM_PER_QUERY, NewProfile(), &clients[i])); ASSERT_TRUE(clients[i].IncreaseReservation(MEM_PER_QUERY)); } // Allocate files for all 2x2 combinations by unpinning pages. vector<vector<PageHandle>> pages(TOTAL_QUERIES); for (int i = 0; i < INITIAL_QUERIES; ++i) { CreatePages(&pool, &clients[i], TEST_BUFFER_LEN, MEM_PER_QUERY, &pages[i]); WriteData(pages[i], 0); UnpinAll(&pool, &clients[i], &pages[i]); for (int j = 0; j < PAGES_PER_QUERY; ++j) { LOG(INFO) << "Manager " << i << " Block " << j << " backed by file " << TmpFilePath(&pages[i][j]); } } for (int i = 0; i < INITIAL_QUERIES; ++i) WaitForAllWrites(&clients[i]); const int ERROR_QUERY = 0; const int NO_ERROR_QUERY = 1; const string& error_dir = tmp_dirs[0]; const string& good_dir = tmp_dirs[1]; // Delete one file from first scratch dir for first query to trigger an error. PageHandle* error_page = FindPageInDir(pages[ERROR_QUERY], error_dir); ASSERT_TRUE(error_page != NULL) << TmpFilePaths(pages[ERROR_QUERY]) << " not in " << DumpScratchDir(error_dir); const string& error_file_path = TmpFilePath(error_page); for (int i = 0; i < INITIAL_QUERIES; ++i) { ASSERT_OK(PinAll(&pool, &clients[i], &pages[i])); } DisableBackingFile(error_file_path); for (int i = 0; i < INITIAL_QUERIES; ++i) UnpinAll(&pool, &clients[i], &pages[i]); // At least one write should hit an error, but it should be recoverable. for (int i = 0; i < INITIAL_QUERIES; ++i) WaitForAllWrites(&clients[i]); // Both clients should still be usable - test the API. for (int i = 0; i < INITIAL_QUERIES; ++i) { ASSERT_OK(PinAll(&pool, &clients[i], &pages[i])); VerifyData(pages[i], 0); UnpinAll(&pool, &clients[i], &pages[i]); ASSERT_OK(AllocateAndFree(&pool, &clients[i], TEST_BUFFER_LEN)); } // Temporary device with error should still be active. vector<TmpFileMgr::DeviceId> active_tmp_devices = test_env_->tmp_file_mgr()->ActiveTmpDevices(); ASSERT_EQ(tmp_dirs.size(), active_tmp_devices.size()); for (int i = 0; i < active_tmp_devices.size(); ++i) { const string& device_path = test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]); ASSERT_EQ(string::npos, error_dir.find(device_path)); } // The query that hit the error should only allocate from the device that had no error. // The other one should continue using both devices, since it didn't encounter a write // error itself. vector<PageHandle> error_new_pages; CreatePages( &pool, &clients[ERROR_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &error_new_pages); UnpinAll(&pool, &clients[ERROR_QUERY], &error_new_pages); WaitForAllWrites(&clients[ERROR_QUERY]); EXPECT_TRUE(FindPageInDir(error_new_pages, good_dir) != NULL) << TmpFilePaths(error_new_pages) << " not in " << DumpScratchDir(good_dir); EXPECT_TRUE(FindPageInDir(error_new_pages, error_dir) == NULL) << TmpFilePaths(error_new_pages) << " in " << DumpScratchDir(error_dir); for (PageHandle& error_new_page : error_new_pages) { LOG(INFO) << "Newly created page backed by file " << TmpFilePath(&error_new_page); EXPECT_TRUE(PageInDir(&error_new_page, good_dir)) << TmpFilePath(&error_new_page) << " not in " << DumpScratchDir(good_dir); } DestroyAll(&pool, &clients[ERROR_QUERY], &error_new_pages); ASSERT_OK(PinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY])); // IMPALA-10216: Verify data to force Pin to complete before unpinning. VerifyData(pages[NO_ERROR_QUERY], 0); UnpinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]); WaitForAllWrites(&clients[NO_ERROR_QUERY]); EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], good_dir) != NULL) << TmpFilePaths(pages[NO_ERROR_QUERY]) << " not in " << DumpScratchDir(good_dir); EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], error_dir) != NULL) << TmpFilePaths(pages[NO_ERROR_QUERY]) << " not in " << DumpScratchDir(error_dir); // The second client should use the bad directory for new pages since // blacklisting is per-query, not global. vector<PageHandle> no_error_new_pages; CreatePages(&pool, &clients[NO_ERROR_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &no_error_new_pages); UnpinAll(&pool, &clients[NO_ERROR_QUERY], &no_error_new_pages); WaitForAllWrites(&clients[NO_ERROR_QUERY]); EXPECT_TRUE(FindPageInDir(no_error_new_pages, good_dir) != NULL) << TmpFilePaths(no_error_new_pages) << " not in " << DumpScratchDir(good_dir); EXPECT_TRUE(FindPageInDir(no_error_new_pages, error_dir) != NULL) << TmpFilePaths(no_error_new_pages) << " not in " << DumpScratchDir(error_dir); DestroyAll(&pool, &clients[NO_ERROR_QUERY], &no_error_new_pages); // A new query should use the both dirs for backing storage. const int NEW_QUERY = 2; ASSERT_OK(pool.RegisterClient("new test client", NewFileGroup(), &global_reservations_, nullptr, MEM_PER_QUERY, NewProfile(), &clients[NEW_QUERY])); ASSERT_TRUE(clients[NEW_QUERY].IncreaseReservation(MEM_PER_QUERY)); CreatePages( &pool, &clients[NEW_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &pages[NEW_QUERY]); UnpinAll(&pool, &clients[NEW_QUERY], &pages[NEW_QUERY]); WaitForAllWrites(&clients[NEW_QUERY]); EXPECT_TRUE(FindPageInDir(pages[NEW_QUERY], good_dir) != NULL) << TmpFilePaths(pages[NEW_QUERY]) << " not in " << DumpScratchDir(good_dir); EXPECT_TRUE(FindPageInDir(pages[NEW_QUERY], error_dir) != NULL) << TmpFilePaths(pages[NEW_QUERY]) << " not in " << DumpScratchDir(error_dir); for (int i = 0; i < TOTAL_QUERIES; ++i) { DestroyAll(&pool, &clients[i], &pages[i]); pool.DeregisterClient(&clients[i]); } } // Test error handling when on-disk data is corrupted and the read fails. TEST_F(BufferPoolTest, ScratchReadError) { // Only allow one buffer in memory. const int64_t TOTAL_MEM = TEST_BUFFER_LEN; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); // Simulate different types of error. enum ErrType { CORRUPT_DATA, // Overwrite real spilled data with bogus data. NO_PERMS, // Remove permissions on the scratch file. TRUNCATE // Truncate the scratch file, destroying spilled data. }; for (ErrType error_type : {CORRUPT_DATA, NO_PERMS, TRUNCATE}) { ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); PageHandle page; ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); // Unpin a page, which will trigger a write. pool.Unpin(&client, &page); WaitForAllWrites(&client); // Force eviction of the page. ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); string tmp_file = TmpFilePath(&page); if (error_type == CORRUPT_DATA) { CorruptBackingFile(tmp_file); } else if (error_type == NO_PERMS) { DisableBackingFile(tmp_file); } else { DCHECK_EQ(error_type, TRUNCATE); TruncateBackingFile(tmp_file); } ASSERT_OK(pool.Pin(&client, &page)); // The read is async, so won't bubble up until we block on it with GetBuffer(). const BufferHandle* page_buffer; Status status = page.GetBuffer(&page_buffer); if (error_type == CORRUPT_DATA && !FLAGS_disk_spill_encryption) { // Without encryption we can't detect that the data changed. EXPECT_OK(status); } else { // Otherwise the read should fail. EXPECT_FALSE(status.ok()); } // Should be able to destroy the page, even though we hit an error. pool.DestroyPage(&client, &page); // If the backing file is still enabled, we should still be able to pin and unpin // pages as normal. ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page)); WriteData(page, 1); pool.Unpin(&client, &page); WaitForAllWrites(&client); if (error_type == NO_PERMS) { // The error prevents read/write of scratch files - this will fail. EXPECT_FALSE(pool.Pin(&client, &page).ok()); } else { // The error does not prevent read/write of scratch files. ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); ASSERT_OK(pool.Pin(&client, &page)); VerifyData(page, 1); } pool.DestroyPage(&client, &page); pool.DeregisterClient(&client); } } /// Test that the buffer pool fails cleanly when all scratch directories are inaccessible /// at runtime. TEST_F(BufferPoolTest, NoDirsAllocationError) { vector<string> tmp_dirs = InitMultipleTmpDirs(2); int MAX_NUM_BUFFERS = 2; int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); vector<PageHandle> pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); for (int i = 0; i < tmp_dirs.size(); ++i) { const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX; chmod(tmp_scratch_subdir.c_str(), 0); } // The error will happen asynchronously. UnpinAll(&pool, &client, &pages); WaitForAllWrites(&client); // Write failure should results in an error getting propagated back to Pin(). for (PageHandle& page : pages) { Status status = pool.Pin(&client, &page); EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, status.code()); } DestroyAll(&pool, &client, &pages); pool.DeregisterClient(&client); } // Test that the buffer pool can still create pages when no scratch is present. TEST_F(BufferPoolTest, NoTmpDirs) { InitMultipleTmpDirs(0); const int MAX_NUM_BUFFERS = 3; const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); vector<PageHandle> pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); // Unpinning is allowed by the BufferPool interface but we won't start any writes to // disk because the flushing heuristic does not eagerly start writes when there are no // active scratch devices. UnpinAll(&pool, &client, &pages); WaitForAllWrites(&client); ASSERT_OK(pool.Pin(&client, pages.data())); // Allocating another buffer will force a write, which will fail. BufferHandle tmp_buffer; Status status = pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &tmp_buffer); ASSERT_FALSE(status.ok()); ASSERT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, status.code()) << status.msg().msg(); DestroyAll(&pool, &client, &pages); pool.DeregisterClient(&client); } // Test that the buffer pool can still create pages when spilling is disabled by // setting scratch_limit = 0. TEST_F(BufferPoolTest, ScratchLimitZero) { const int QUERY_BUFFERS = 3; const int64_t TOTAL_MEM = 100 * TEST_BUFFER_LEN; const int64_t QUERY_MEM = QUERY_BUFFERS * TEST_BUFFER_LEN; // Set up a query state with the scratch_limit option in the TestEnv. test_env_.reset(new TestEnv()); test_env_->SetBufferPoolArgs(TEST_BUFFER_LEN, TOTAL_MEM); ASSERT_OK(test_env_->Init()); BufferPool* pool = test_env_->exec_env()->buffer_pool(); RuntimeState* state; TQueryOptions query_options; query_options.scratch_limit = 0; ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &state)); ClientHandle client; ASSERT_OK(pool->RegisterClient("test client", state->query_state()->file_group(), state->instance_buffer_reservation(), obj_pool_.Add(new MemTracker(-1, "", state->instance_mem_tracker())), QUERY_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(QUERY_MEM)); vector<PageHandle> pages; CreatePages(pool, &client, TEST_BUFFER_LEN, QUERY_MEM, &pages); // Spilling is disabled by the QueryState when scratch_limit is 0, so trying to unpin // will cause a DCHECK. IMPALA_ASSERT_DEBUG_DEATH(pool->Unpin(&client, pages.data()), ""); DestroyAll(pool, &client, &pages); pool->DeregisterClient(&client); } TEST_F(BufferPoolTest, SingleRandom) { TestRandomInternalSingle(8 * 1024, true); TestRandomInternalSingle(8 * 1024, false); } TEST_F(BufferPoolTest, Multi2Random) { TestRandomInternalMulti(2, 8 * 1024, true); TestRandomInternalMulti(2, 8 * 1024, false); } TEST_F(BufferPoolTest, Multi4Random) { TestRandomInternalMulti(4, 8 * 1024, true); TestRandomInternalMulti(4, 8 * 1024, false); } TEST_F(BufferPoolTest, Multi8Random) { TestRandomInternalMulti(8, 8 * 1024, true); TestRandomInternalMulti(8, 8 * 1024, false); } // Test with remote scratch space mixed with two local dirs. TEST_F(BufferPoolTest, Multi8RandomSpillToRemoteMix) { // 4M buffer size. InitTmpFileMgrSpillToRemote(2, 8 * 1024, 4 * 1024 * 1024); TestRandomInternalMulti(8, 8 * 1024, true); TestRandomInternalMulti(8, 8 * 1024, false); } // Test with remote scratch space only. TEST_F(BufferPoolTest, Multi8RandomSpillToRemote) { // 4M buffer size. InitTmpFileMgrSpillToRemote(0, -1, 4 * 1024 * 1024); TestRandomInternalMulti(8, 8 * 1024, true); TestRandomInternalMulti(8, 8 * 1024, false); } // Sanity test with hole punching and no compression. // This will be run with and without encryption because those flags are toggled for the // entire test suite. TEST_F(BufferPoolTest, RandomHolePunch) { InitTmpFileMgr(2, "", false); TestRandomInternalSingle(8 * 1024, true); TestRandomInternalMulti(4, 8 * 1024, true); } // Sanity test with compression and hole punching. // This will be run with and without encryption because those flags are toggled for the // entire test suite. TEST_F(BufferPoolTest, RandomCompressionHolePunch) { InitTmpFileMgr(2, "lz4", true); TestRandomInternalSingle(8 * 1024, true); TestRandomInternalMulti(4, 8 * 1024, true); } // Single-threaded execution of the TestRandomInternalImpl. void BufferPoolTest::TestRandomInternalSingle( int64_t min_buffer_len, bool multiple_pins) { const int MAX_NUM_BUFFERS = 200; const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * min_buffer_len; MetricGroup tmp_metrics("test-metrics"); BufferPool pool(&tmp_metrics, min_buffer_len, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); MemTracker global_tracker(TOTAL_MEM); TestRandomInternalImpl( &pool, NewFileGroup(), &global_tracker, &rng_, SINGLE_THREADED_TID, multiple_pins); global_reservations_.Close(); } // Multi-threaded execution of the TestRandomInternalImpl. void BufferPoolTest::TestRandomInternalMulti( int num_threads, int64_t min_buffer_len, bool multiple_pins) { const int MAX_NUM_BUFFERS_PER_THREAD = 200; const int64_t TOTAL_MEM = num_threads * MAX_NUM_BUFFERS_PER_THREAD * min_buffer_len; MetricGroup tmp_metrics("test-metrics"); BufferPool pool(&tmp_metrics, min_buffer_len, TOTAL_MEM, TOTAL_MEM); global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); MemTracker global_tracker(TOTAL_MEM); TmpFileGroup* shared_file_group = NewFileGroup(); thread_group workers; vector<mt19937> rngs = RandTestUtil::CreateThreadLocalRngs(num_threads, &rng_); for (int i = 0; i < num_threads; ++i) { workers.add_thread(new thread( [this, &pool, shared_file_group, &global_tracker, &rngs, i, multiple_pins]() { TestRandomInternalImpl( &pool, shared_file_group, &global_tracker, &rngs[i], i, multiple_pins); })); } AtomicInt32 stop_maintenance(0); thread maintenance_thread([&pool, &stop_maintenance]() { while (stop_maintenance.Load() == 0) { pool.Maintenance(); SleepForMs(50); } }); workers.join_all(); stop_maintenance.Add(1); maintenance_thread.join(); global_reservations_.Close(); } /// Randomly issue AllocateBuffer(), FreeBuffer(), CreatePage(), Pin(), Unpin(), and /// DestroyPage() calls. All calls made are legal - error conditions are not expected. /// When executed in single-threaded mode 'tid' should be SINGLE_THREADED_TID. If /// 'multiple_pins' is true, pages can be pinned multiple times (useful to test this /// functionality). Otherwise they are only pinned once (useful to test the case when /// memory is more committed). void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, TmpFileGroup* file_group, MemTracker* parent_mem_tracker, mt19937* rng, int tid, bool multiple_pins) { // Encrypting and decrypting is expensive - reduce iterations when encryption is on. int num_iterations = FLAGS_disk_spill_encryption ? 5000 : 50000; // All the existing pages and buffers along with the sentinel values written to them. vector<pair<PageHandle, int>> pages; vector<pair<BufferHandle, int>> buffers; /// Pick a power-of-two buffer sizes that are up to 2^4 times the minimum buffer length. uniform_int_distribution<int> buffer_exponent_dist(0, 4); ClientHandle client; ASSERT_OK(pool->RegisterClient(Substitute("$0", tid), file_group, &global_reservations_, obj_pool_.Add(new MemTracker(-1, "", parent_mem_tracker)), 1L << 48, NewProfile(), &client)); for (int i = 0; i < num_iterations; ++i) { if ((i % 10000) == 0) LOG(ERROR) << " Iteration " << i << endl; // Pick an operation. // New page: 15% // Pin a page and block waiting for the result: 20% // Pin a page and let it continue asynchronously: 10% // Unpin a pinned page: 25% (< Pin prob. so that memory consumption increases). // Destroy page: 10% (< New page prob. so that number of pages grows over time). // Allocate buffer: 10% // Free buffer: 9.9% // Switch core that the thread is executing on: 0.1% double p = uniform_real_distribution<double>(0.0, 1.0)(*rng); if (p < 0.15) { // Create a new page. int64_t page_len = pool->min_buffer_len() << (buffer_exponent_dist)(*rng); if (!client.IncreaseReservationToFit(page_len)) continue; PageHandle new_page; ASSERT_OK(pool->CreatePage(&client, page_len, &new_page)); int data = (*rng)(); WriteData(new_page, data); pages.emplace_back(move(new_page), data); } else if (p < 0.45) { // Pin a page asynchronously. if (pages.empty()) continue; int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng); PageHandle* page = &pages[rand_pick].first; if (!client.IncreaseReservationToFit(page->len())) continue; if (!page->is_pinned() || multiple_pins) { ASSERT_OK(pool->Pin(&client, page)); } // Block on the pin and verify data for sync pins. if (p < 0.35) VerifyData(*page, pages[rand_pick].second); } else if (p < 0.70) { // Unpin a pinned page. if (pages.empty()) continue; int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng); PageHandle* page = &pages[rand_pick].first; if (page->is_pinned()) { VerifyData(*page, pages[rand_pick].second); pool->Unpin(&client, page); } } else if (p < 0.80) { // Destroy a page. if (pages.empty()) continue; int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng); auto page_data = move(pages[rand_pick]); if (page_data.first.is_pinned()) VerifyData(page_data.first, page_data.second); pages[rand_pick] = move(pages.back()); pages.pop_back(); pool->DestroyPage(&client, &page_data.first); } else if (p < 0.90) { // Allocate a buffer. Pick a random power-of-two size that is up to 2^4 // times the minimum buffer length. int64_t buffer_len = pool->min_buffer_len() << (buffer_exponent_dist)(*rng); if (!client.IncreaseReservationToFit(buffer_len)) continue; BufferHandle new_buffer; ASSERT_OK(pool->AllocateBuffer(&client, buffer_len, &new_buffer)); int data = (*rng)(); WriteData(new_buffer, data); buffers.emplace_back(move(new_buffer), data); } else if (p < 0.999) { // Free a buffer. if (buffers.empty()) continue; int rand_pick = uniform_int_distribution<int>(0, buffers.size() - 1)(*rng); auto buffer_data = move(buffers[rand_pick]); buffers[rand_pick] = move(buffers.back()); buffers.pop_back(); pool->FreeBuffer(&client, &buffer_data.first); } else { CpuTestUtil::PinToRandomCore(rng); } } // The client needs to delete all its pages. for (auto& page : pages) pool->DestroyPage(&client, &page.first); for (auto& buffer : buffers) pool->FreeBuffer(&client, &buffer.first); pool->DeregisterClient(&client); } /// Test basic SubReservation functionality. TEST_F(BufferPoolTest, SubReservation) { const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 10; global_reservations_.InitRootTracker(NULL, TOTAL_MEM); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN)); BufferPool::SubReservation subreservation(&client); BufferPool::BufferHandle buffer; // Save and check that the reservation moved as expected. client.SaveReservation(&subreservation, TEST_BUFFER_LEN); EXPECT_EQ(0, client.GetUnusedReservation()); EXPECT_EQ(TEST_BUFFER_LEN, subreservation.GetReservation()); // Should not be able to allocate from client since the reservation was moved. IMPALA_ASSERT_DEBUG_DEATH(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN), ""); // Restore and check that the reservation moved as expected. client.RestoreReservation(&subreservation, TEST_BUFFER_LEN); EXPECT_EQ(TEST_BUFFER_LEN, client.GetUnusedReservation()); EXPECT_EQ(0, subreservation.GetReservation()); // Should be able to allocate from the client after restoring. ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); EXPECT_EQ(TEST_BUFFER_LEN, client.GetUnusedReservation()); subreservation.Close(); pool.DeregisterClient(&client); } // Check that we can decrease reservation without violating any buffer pool invariants. TEST_F(BufferPoolTest, DecreaseReservation) { const int MAX_NUM_BUFFERS = 4; const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); vector<PageHandle> pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); WriteData(pages, 0); // Unpin pages and decrease reservation while the writes are in flight. UnpinAll(&pool, &client, &pages); ASSERT_OK(client.DecreaseReservationTo( numeric_limits<int64_t>::max(), 2 * TEST_BUFFER_LEN)); // Two pages must be clean to stay within reservation EXPECT_GE(pool.GetNumCleanPages(), 2); EXPECT_EQ(2 * TEST_BUFFER_LEN, client.GetReservation()); // Decrease it further after the pages are evicted. WaitForAllWrites(&client); ASSERT_OK(client.DecreaseReservationTo( numeric_limits<int64_t>::max(), TEST_BUFFER_LEN)); EXPECT_GE(pool.GetNumCleanPages(), 3); EXPECT_EQ(TEST_BUFFER_LEN, client.GetReservation()); // Check that we can still use the reservation. ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); EXPECT_EQ(1, NumEvicted(pages)); // Check that we can decrease it to zero, with the max decrease applied. const int64_t MAX_DECREASE = 123; ASSERT_OK(client.DecreaseReservationTo(MAX_DECREASE, 0)); EXPECT_EQ(TEST_BUFFER_LEN - MAX_DECREASE, client.GetReservation()); ASSERT_OK(client.DecreaseReservationTo(numeric_limits<int64_t>::max(), 0)); EXPECT_EQ(0, client.GetReservation()); // Test concurrent increases and decreases do not race. All increases go through // and each decrease reduces the reservation by DECREASE_SIZE or less. const int NUM_CONCURRENT_INCREASES = 50; const int NUM_CONCURRENT_DECREASES = 50; const int64_t INCREASE_SIZE = 13; const int64_t DECREASE_SIZE = 7; const int64_t START_RESERVATION = 1000; const int64_t MIN_RESERVATION = 500; ASSERT_TRUE(client.IncreaseReservation(START_RESERVATION)); thread increaser([&] { for (int i = 0; i < NUM_CONCURRENT_INCREASES; ++i) { ASSERT_TRUE(client.IncreaseReservation(INCREASE_SIZE)); SleepForMs(0); } }); for (int i = 0; i < NUM_CONCURRENT_DECREASES; ++i) { ASSERT_OK(client.DecreaseReservationTo(DECREASE_SIZE, MIN_RESERVATION)); } increaser.join(); // All increases and decreased should have been applied. EXPECT_EQ(START_RESERVATION + INCREASE_SIZE * NUM_CONCURRENT_INCREASES - DECREASE_SIZE * NUM_CONCURRENT_DECREASES, client.GetReservation()); ASSERT_OK(client.DecreaseReservationTo(numeric_limits<int64_t>::max(), 0)); EXPECT_EQ(0, client.GetReservation()); DestroyAll(&pool, &client, &pages); pool.DeregisterClient(&client); global_reservations_.Close(); } // Test concurrent operations using the same client and different buffers. TEST_F(BufferPoolTest, ConcurrentBufferOperations) { const int DELETE_THREADS = 2; const int ALLOCATE_THREADS = 2; const int NUM_ALLOCATIONS_PER_THREAD = 128; const int MAX_NUM_BUFFERS = 16; const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", nullptr, &global_reservations_, nullptr, TOTAL_MEM, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM)); thread_group allocate_threads; thread_group delete_threads; AtomicInt64 available_reservation(TOTAL_MEM); // Queue of buffers to be deleted, along with the first byte of the data in // the buffer, for validation purposes. BlockingQueue<pair<uint8_t, BufferHandle>> delete_queue(MAX_NUM_BUFFERS); // Allocate threads allocate buffers whenever able and enqueue them. for (int i = 0; i < ALLOCATE_THREADS; ++i) { allocate_threads.add_thread(new thread([&] { for (int j = 0; j < NUM_ALLOCATIONS_PER_THREAD; ++j) { // Try to deduct reservation. while (true) { int64_t val = available_reservation.Load(); if (val >= TEST_BUFFER_LEN && available_reservation.CompareAndSwap(val, val - TEST_BUFFER_LEN)) { break; } } BufferHandle buffer; ASSERT_OK(pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &buffer)); uint8_t first_byte = static_cast<uint8_t>(j % 256); buffer.data()[0] = first_byte; delete_queue.BlockingPut(pair<uint8_t, BufferHandle>(first_byte, move(buffer))); } })); } // Delete threads pull buffers off the queue and free them. for (int i = 0; i < DELETE_THREADS; ++i) { delete_threads.add_thread(new thread([&] { pair<uint8_t, BufferHandle> item; while (delete_queue.BlockingGet(&item)) { ASSERT_EQ(item.first, item.second.data()[0]); pool.FreeBuffer(&client, &item.second); available_reservation.Add(TEST_BUFFER_LEN); } })); } allocate_threads.join_all(); delete_queue.Shutdown(); delete_threads.join_all(); pool.DeregisterClient(&client); global_reservations_.Close(); } // IMPALA-7446: the buffer pool GC hook that's set up in ExecEnv should // free cached buffers. TEST_F(BufferPoolTest, BufferPoolGc) { const int64_t BUFFER_SIZE = 1024L * 1024L * 1024L; // Set up a small buffer pool and process mem limit that fits only a single buffer. // A large buffer size is used so that untracked memory is small relative to the // buffer. test_env_.reset(new TestEnv); test_env_->SetBufferPoolArgs(1024, BUFFER_SIZE); // Make sure we have a process memory tracker that uses TCMalloc metrics to match // GC behaviour of a real impalad and reproduce IMPALA-7446. We need to add some // extra headroom for other allocations and overhead. test_env_->SetProcessMemTrackerArgs(BUFFER_SIZE * 3 / 2, true); ASSERT_OK(test_env_->Init()); BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool(); // Set up a client with unlimited reservation. MemTracker* client_tracker = obj_pool_.Add( new MemTracker(-1, "client", test_env_->exec_env()->process_mem_tracker())); BufferPool::ClientHandle client; ASSERT_OK(buffer_pool->RegisterClient("", nullptr, test_env_->exec_env()->buffer_reservation(), client_tracker, numeric_limits<int>::max(), NewProfile(), &client)); BufferPool::BufferHandle buffer; ASSERT_TRUE(client.IncreaseReservation(BUFFER_SIZE)); // Make sure that buffers/pages were gc'ed and/or recycled. EXPECT_EQ(0, buffer_pool->GetSystemBytesAllocated()); ASSERT_OK(buffer_pool->AllocateBuffer(&client, BUFFER_SIZE, &buffer)); buffer_pool->FreeBuffer(&client, &buffer); ASSERT_OK(client.DecreaseReservationTo(numeric_limits<int64_t>::max(), 0)); // Before IMPALA-7446 was fixed, this reservation increase would fail because the // free buffer counted against the process memory limit. ASSERT_TRUE(client.IncreaseReservation(BUFFER_SIZE)); ASSERT_OK(buffer_pool->AllocateBuffer(&client, BUFFER_SIZE, &buffer)); buffer_pool->FreeBuffer(&client, &buffer); buffer_pool->DeregisterClient(&client); } /// IMPALA-9851: Cap the number of pages that can be printed at /// BufferPool::Client::DebugString(). TEST_F(BufferPoolTest, ShortDebugString) { // Allocate pages more than BufferPool::MAX_PAGE_ITER_DEBUG. int num_pages = 105; int64_t max_page_len = TEST_BUFFER_LEN; int64_t total_mem = num_pages * max_page_len; global_reservations_.InitRootTracker(NULL, total_mem); BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem); BufferPool::ClientHandle client; ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL, total_mem, NewProfile(), &client)); ASSERT_TRUE(client.IncreaseReservation(total_mem)); vector<BufferPool::PageHandle> handles(num_pages); // Create pages of various valid sizes. for (int i = 0; i < num_pages; ++i) { int64_t page_len = TEST_BUFFER_LEN; int64_t used_before = client.GetUsedReservation(); ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i])); ASSERT_TRUE(handles[i].is_open()); ASSERT_TRUE(handles[i].is_pinned()); const BufferHandle* buffer; ASSERT_OK(handles[i].GetBuffer(&buffer)); ASSERT_TRUE(buffer->data() != NULL); ASSERT_EQ(handles[i].len(), page_len); ASSERT_EQ(buffer->len(), page_len); ASSERT_EQ(client.GetUsedReservation(), used_before + page_len); } // Verify that only subset of pages are included in DebugString(). string page_count_substr = Substitute( "$0 out of $1 pinned pages:", BufferPool::MAX_PAGE_ITER_DEBUG, num_pages); string debug_string = client.DebugString(); ASSERT_NE(debug_string.find(page_count_substr), string::npos) << page_count_substr << " not found at BufferPool::Client::DebugString(). " << debug_string; // Close the handles and check memory consumption. for (int i = 0; i < num_pages; ++i) { int64_t used_before = client.GetUsedReservation(); int page_len = handles[i].len(); pool.DestroyPage(&client, &handles[i]); ASSERT_EQ(client.GetUsedReservation(), used_before - page_len); } pool.DeregisterClient(&client); // All the reservations should be released at this point. ASSERT_EQ(global_reservations_.GetReservation(), 0); global_reservations_.Close(); } } // namespace impala int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); impala::InitFeSupport(); ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm()); int result = 0; for (bool encryption : {false, true}) { for (bool numa : {false, true}) { if (!numa && encryption) continue; // Not an interesting combination. impala::CpuTestUtil::SetupFakeNuma(numa); FLAGS_disk_spill_encryption = encryption; std::cerr << "+==================================================" << std::endl << "| Running tests with encryption=" << encryption << " numa=" << numa << std::endl << "+==================================================" << std::endl; if (RUN_ALL_TESTS() != 0) result = 1; } } return result; }