common/protobuf/kudu/util/threadpool-test.cc (914 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "kudu/util/threadpool.h" #include <unistd.h> #include <algorithm> #include <atomic> #include <cstdint> #include <functional> #include <iterator> #include <limits> #include <memory> #include <mutex> #include <ostream> #include <string> #include <thread> #include <vector> #include <boost/smart_ptr/shared_ptr.hpp> #include <gflags/gflags_declare.h> #include <glog/logging.h> #include <gtest/gtest.h> #include "kudu/gutil/atomicops.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/sysinfo.h" #include "kudu/util/barrier.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/locks.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/promise.h" #include "kudu/util/random.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" #include "kudu/util/trace.h" using std::atomic; using std::make_shared; using std::shared_ptr; using std::string; using std::thread; using std::unique_ptr; using std::vector; using strings::Substitute; DECLARE_int32(thread_inject_start_latency_ms); namespace kudu { static const char* kDefaultPoolName = "test"; class ThreadPoolTest : public KuduTest { public: virtual void SetUp() override { KuduTest::SetUp(); ASSERT_OK(ThreadPoolBuilder(kDefaultPoolName).Build(&pool_)); } Status RebuildPoolWithBuilder(const ThreadPoolBuilder& builder) { return builder.Build(&pool_); } Status RebuildPoolWithMinMax(int min_threads, int max_threads) { return ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(min_threads) .set_max_threads(max_threads) .Build(&pool_); } protected: unique_ptr<ThreadPool> pool_; }; TEST_F(ThreadPoolTest, TestNoTaskOpenClose) { ASSERT_OK(RebuildPoolWithMinMax(4, 4)); pool_->Shutdown(); } static void SimpleTaskMethod(int n, Atomic32* counter) { while (n--) { base::subtle::NoBarrier_AtomicIncrement(counter, 1); boost::detail::yield(n); } } class SimpleTask { public: SimpleTask(int n, Atomic32* counter) : n_(n), counter_(counter) { } void Run() { SimpleTaskMethod(n_, counter_); } private: int n_; Atomic32* counter_; }; TEST_F(ThreadPoolTest, TestSimpleTasks) { ASSERT_OK(RebuildPoolWithMinMax(4, 4)); Atomic32 counter(0); SimpleTask task(15, &counter); ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(10, &counter); })); ASSERT_OK(pool_->Submit([&task]() { task.Run(); })); ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(20, &counter); })); ASSERT_OK(pool_->Submit([&task]() { task.Run(); })); ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(123, &counter); })); pool_->Wait(); ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter)); } static void IssueTraceStatement() { TRACE("hello from task"); } // Test that the thread-local trace is propagated to tasks // submitted to the threadpool. TEST_F(ThreadPoolTest, TestTracePropagation) { ASSERT_OK(RebuildPoolWithMinMax(1, 1)); scoped_refptr<Trace> t(new Trace); { ADOPT_TRACE(t.get()); ASSERT_OK(pool_->Submit(&IssueTraceStatement)); } pool_->Wait(); ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task"); } TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) { ASSERT_OK(RebuildPoolWithMinMax(1, 1)); pool_->Shutdown(); Status s = pool_->Submit(&IssueTraceStatement); ASSERT_EQ("Service unavailable: The pool has been shut down.", s.ToString()); } TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) { constexpr int kIdleTimeoutMs = 1; ASSERT_OK(RebuildPoolWithBuilder( ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(0) .set_max_threads(3) .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs)))); // There are no threads to start with. ASSERT_TRUE(pool_->num_threads() == 0); // We get up to 3 threads when submitting work. CountDownLatch latch(1); SCOPED_CLEANUP({ latch.CountDown(); }); ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_EQ(2, pool_->num_threads()); ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_EQ(3, pool_->num_threads()); // The 4th piece of work gets queued. ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_EQ(3, pool_->num_threads()); // Finish all work latch.CountDown(); pool_->Wait(); ASSERT_EQ(0, pool_->active_threads_); // Wait for more than idle timeout: so threads should be gone since // min_threads is set to 0. SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs)); ASSERT_EQ(0, pool_->num_threads()); ASSERT_EQ(0, pool_->active_threads_); pool_->Shutdown(); ASSERT_EQ(0, pool_->num_threads()); } TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) { // By default a threadpool's max_threads is set to the number of CPUs, so // this test submits more tasks than that to ensure that the number of CPUs // isn't some kind of upper bound. const int kNumCPUs = base::NumCPUs(); // Build a threadpool with no limit on the maximum number of threads. ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_max_threads(std::numeric_limits<int>::max()))); CountDownLatch latch(1); auto cleanup_latch = MakeScopedCleanup([&]() { latch.CountDown(); }); // Submit tokenless tasks. Each should create a new thread. for (int i = 0; i < kNumCPUs * 2; i++) { ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); } ASSERT_EQ((kNumCPUs * 2), pool_->num_threads()); // Submit tasks on two tokens. Only two threads should be created. unique_ptr<ThreadPoolToken> t1 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL); unique_ptr<ThreadPoolToken> t2 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL); for (int i = 0; i < kNumCPUs * 2; i++) { ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get(); ASSERT_OK(t->Submit([&latch]() { latch.Wait(); })); } ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads()); // Submit more tokenless tasks. Each should create a new thread. for (int i = 0; i < kNumCPUs; i++) { ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); } ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads()); latch.CountDown(); pool_->Wait(); } // Regression test for a bug where a task is submitted exactly // as a thread is about to exit. Previously this could hang forever. TEST_F(ThreadPoolTest, TestRace) { alarm(60); auto cleanup = MakeScopedCleanup([]() { alarm(0); // Disable alarm on test exit. }); ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(0) .set_max_threads(1) .set_idle_timeout(MonoDelta::FromMicroseconds(1)))); for (int i = 0; i < 500; i++) { CountDownLatch l(1); ASSERT_OK(pool_->Submit([&l]() { l.CountDown(); })); l.Wait(); // Sleeping a different amount in each iteration makes it more likely to hit // the bug. SleepFor(MonoDelta::FromMicroseconds(i)); } } TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) { constexpr int kIdleTimeoutMs = 1; ASSERT_OK(RebuildPoolWithBuilder( ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(1) .set_max_threads(4) .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs)))); // There is 1 thread to start with. ASSERT_EQ(1, pool_->num_threads()); // We get up to 4 threads when submitting work. CountDownLatch latch(1); ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_EQ(1, pool_->num_threads()); ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_EQ(2, pool_->num_threads()); ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_EQ(3, pool_->num_threads()); ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_EQ(4, pool_->num_threads()); // The 5th piece of work gets queued. ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_EQ(4, pool_->num_threads()); // Finish all work latch.CountDown(); pool_->Wait(); ASSERT_EQ(0, pool_->active_threads_); SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs)); ASSERT_EQ(0, pool_->active_threads_); ASSERT_EQ(1, pool_->num_threads()); pool_->Shutdown(); ASSERT_EQ(0, pool_->num_threads()); } TEST_F(ThreadPoolTest, TestMaxQueueSize) { ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(1) .set_max_threads(1) .set_max_queue_size(1))); CountDownLatch latch(1); // We will be able to submit two tasks: one for max_threads == 1 and one for // max_queue_size == 1. ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); Status s = pool_->Submit([&latch]() { latch.Wait(); }); CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString(); latch.CountDown(); pool_->Wait(); } // Test that when we specify a zero-sized queue, the maximum number of threads // running is used for enforcement. TEST_F(ThreadPoolTest, TestZeroQueueSize) { const int kMaxThreads = 4; ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_max_queue_size(0) .set_max_threads(kMaxThreads))); CountDownLatch latch(1); for (int i = 0; i < kMaxThreads; i++) { ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); } Status s = pool_->Submit([&latch]() { latch.Wait(); }); ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity"); latch.CountDown(); pool_->Wait(); } // Regression test for KUDU-2187: // // If a threadpool thread is slow to start up, it shouldn't block progress of // other tasks on the same pool. TEST_F(ThreadPoolTest, TestSlowThreadStart) { // Start a pool of threads from which we'll submit tasks. unique_ptr<ThreadPool> submitter_pool; ASSERT_OK(ThreadPoolBuilder("submitter") .set_min_threads(5) .set_max_threads(5) .Build(&submitter_pool)); // Start the actual test pool, which starts with one thread // but will start a second one on-demand. ASSERT_OK(RebuildPoolWithMinMax(1, 2)); // Ensure that the second thread will take a long time to start. FLAGS_thread_inject_start_latency_ms = 3000; // Now submit 10 tasks to the 'submitter' pool, each of which // submits a single task to 'pool_'. The 'pool_' task sleeps // for 10ms. // // Because the 'submitter' tasks submit faster than they can be // processed on a single thread (due to the sleep), we expect that // this will trigger 'pool_' to start up its second worker thread. // The thread startup will have some latency injected. // // We expect that the thread startup will block only one of the // tasks in the 'submitter' pool after it submits its task. Other // tasks will continue to be processed by the other (already-running) // thread on 'pool_'. std::atomic<int32_t> total_queue_time_ms(0); for (int i = 0; i < 10; i++) { ASSERT_OK(submitter_pool->Submit([&]() { auto submit_time = MonoTime::Now(); CHECK_OK(pool_->Submit([&,submit_time]() { auto queue_time = MonoTime::Now() - submit_time; total_queue_time_ms += queue_time.ToMilliseconds(); SleepFor(MonoDelta::FromMilliseconds(10)); })); })); } submitter_pool->Wait(); pool_->Wait(); // Since the total amount of work submitted was only 100ms, we expect // that the performance would be equivalent to a single-threaded // threadpool. So, we expect the total queue time to be approximately // 0 + 10 + 20 ... + 80 + 90 = 450ms. // // If, instead, throughput had been blocked while starting threads, // we'd get something closer to 18000ms (3000ms delay * 5 submitter threads). ASSERT_GE(total_queue_time_ms, 400); ASSERT_LE(total_queue_time_ms, 10000); } // Test that setting a promise from another thread yields // a value on the current thread. TEST_F(ThreadPoolTest, TestPromises) { ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(1) .set_max_threads(1) .set_max_queue_size(1))); Promise<int> my_promise; ASSERT_OK(pool_->Submit([&my_promise]() { my_promise.Set(5); })); ASSERT_EQ(5, my_promise.Get()); } METRIC_DEFINE_entity(test_entity); METRIC_DEFINE_histogram(test_entity, queue_length, "queue length", MetricUnit::kTasks, "queue length", kudu::MetricLevel::kInfo, 1000, 1); METRIC_DEFINE_histogram(test_entity, queue_time, "queue time", MetricUnit::kMicroseconds, "queue time", kudu::MetricLevel::kInfo, 1000000, 1); METRIC_DEFINE_histogram(test_entity, run_time, "run time", MetricUnit::kMicroseconds, "run time", kudu::MetricLevel::kInfo, 1000, 1); TEST_F(ThreadPoolTest, TestMetrics) { MetricRegistry registry; vector<ThreadPoolMetrics> all_metrics; for (int i = 0; i < 3; i++) { scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate( &registry, Substitute("test $0", i)); all_metrics.emplace_back(ThreadPoolMetrics{ METRIC_queue_length.Instantiate(entity), METRIC_queue_time.Instantiate(entity), METRIC_run_time.Instantiate(entity) }); } // Enable metrics for the thread pool. ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(1) .set_max_threads(1) .set_metrics(all_metrics[0]))); unique_ptr<ThreadPoolToken> t1 = pool_->NewTokenWithMetrics( ThreadPool::ExecutionMode::SERIAL, all_metrics[1]); unique_ptr<ThreadPoolToken> t2 = pool_->NewTokenWithMetrics( ThreadPool::ExecutionMode::SERIAL, all_metrics[2]); // Submit once to t1, twice to t2, and three times without a token. ASSERT_OK(t1->Submit([](){})); ASSERT_OK(t2->Submit([](){})); ASSERT_OK(t2->Submit([](){})); ASSERT_OK(pool_->Submit([](){})); ASSERT_OK(pool_->Submit([](){})); ASSERT_OK(pool_->Submit([](){})); pool_->Wait(); // The total counts should reflect the number of submissions to each token. ASSERT_EQ(1, all_metrics[1].queue_length_histogram->TotalCount()); ASSERT_EQ(1, all_metrics[1].queue_time_us_histogram->TotalCount()); ASSERT_EQ(1, all_metrics[1].run_time_us_histogram->TotalCount()); ASSERT_EQ(2, all_metrics[2].queue_length_histogram->TotalCount()); ASSERT_EQ(2, all_metrics[2].queue_time_us_histogram->TotalCount()); ASSERT_EQ(2, all_metrics[2].run_time_us_histogram->TotalCount()); // And the counts on the pool-wide metrics should reflect all submissions. ASSERT_EQ(6, all_metrics[0].queue_length_histogram->TotalCount()); ASSERT_EQ(6, all_metrics[0].queue_time_us_histogram->TotalCount()); ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount()); } // Test scenario to verify the functionality of the QueueLoadMeter. TEST_F(ThreadPoolTest, QueueLoadMeter) { const auto kQueueTimeThresholdMs = 100; const auto kIdleThreadTimeoutMs = 200; constexpr auto kMaxThreads = 3; ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(0) .set_max_threads(kMaxThreads) .set_queue_overload_threshold(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)) .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleThreadTimeoutMs)))); // An idle pool must not have its queue overloaded. ASSERT_FALSE(pool_->QueueOverloaded()); // One instant tasks cannot make pool's queue overloaded. ASSERT_OK(pool_->Submit([](){})); ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); for (auto i = 0; i < kMaxThreads; ++i) { ASSERT_OK(pool_->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs)); })); } ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); for (auto i = 0; i < 2 * kMaxThreads; ++i) { ASSERT_OK(pool_->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs)); })); } ASSERT_FALSE(pool_->QueueOverloaded()); SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10)); ASSERT_TRUE(pool_->QueueOverloaded()); // Should still be overloaded after first kMaxThreads tasks are processed. SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10)); ASSERT_TRUE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); // Many instant tasks cannot make pool overloaded. for (auto i = 0; i < kMaxThreads; ++i) { ASSERT_OK(pool_->Submit([](){})); } ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); // For for the threads to be shutdown due to inactivity. SleepFor(MonoDelta::FromMilliseconds(2 * kIdleThreadTimeoutMs)); // Even if all threads are shutdown, an idle pool with empty queue should not // be overloaded. ASSERT_FALSE(pool_->QueueOverloaded()); // Shovel some light tasks once again: this should not overload the queue. for (auto i = 0; i < 10 * kMaxThreads; ++i) { ASSERT_OK(pool_->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(1)); })); } ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); // Submit a bunch of instant tasks via a single token: the queue should not // become overloaded. { unique_ptr<ThreadPoolToken> t = pool_->NewToken( ThreadPool::ExecutionMode::SERIAL); ASSERT_OK(t->Submit([](){})); ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); for (auto i = 0; i < 100; ++i) { ASSERT_OK(t->Submit([](){})); } ASSERT_FALSE(pool_->QueueOverloaded()); SleepFor(MonoDelta::FromMilliseconds(1)); ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); } // Submit many instant tasks via multiple tokens (more than the maximum // number of worker threads in a pool) and many lightweight tasks which can // run concurrently: the queue should not become overloaded. { constexpr auto kNumTokens = 2 * kMaxThreads; vector<unique_ptr<ThreadPoolToken>> tokens; tokens.reserve(kNumTokens); for (auto i = 0; i < kNumTokens; ++i) { tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); } for (auto& t : tokens) { for (auto i = 0; i < 50; ++i) { ASSERT_OK(t->Submit([](){})); } for (auto i = 0; i < 10; ++i) { ASSERT_OK(pool_->Submit([](){})); } } ASSERT_FALSE(pool_->QueueOverloaded()); SleepFor(MonoDelta::FromMilliseconds(1)); ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); } // Submit many long running tasks via serial tokens where the number of tokens // is less than the maximum number of worker threads in the pool. The queue // of the pool should not become overloaded since the pool has one spare // thread to spawn. { constexpr auto kNumTokens = kMaxThreads - 1; ASSERT_GT(kNumTokens, 0); vector<unique_ptr<ThreadPoolToken>> tokens; tokens.reserve(kNumTokens); for (auto i = 0; i < kNumTokens; ++i) { tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); } ASSERT_FALSE(pool_->QueueOverloaded()); for (auto& t : tokens) { for (auto i = 0; i < kMaxThreads; ++i) { ASSERT_OK(t->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); })); } } ASSERT_FALSE(pool_->QueueOverloaded()); SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); } // Submit many long running tasks via serial tokens where the number of tokens // is greater or equal to the maximum number of worker threads in the pool. // The queue of the pool should become overloaded since the pool is running // at its capacity and queue times are over the threshold. { constexpr auto kNumTokens = kMaxThreads; vector<unique_ptr<ThreadPoolToken>> tokens; tokens.reserve(kNumTokens); for (auto i = 0; i < kNumTokens; ++i) { tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); } ASSERT_FALSE(pool_->QueueOverloaded()); for (auto& t : tokens) { for (auto i = 0; i < kMaxThreads; ++i) { ASSERT_OK(t->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); })); } } // Since there is exactly kMaxThreads serial pool tokens with tasks, // the queue is empty most of the time. This is because active serial tokens // are not kept in the queue. So, the status of the queue cannot be reliably // determined by peeking at the submission times of the elements in the // queue. Then the only way to detect overload of the queue is the history // of queue times. The latter will reflect long queue times only after // processing two tasks in each of the serial tokens. So, it's expected // to get a stable report on the queue status only after two // kQueueTimeThresholdMs time intervals. SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs)); ASSERT_TRUE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); } // A mixed case: submit many long running tasks via serial tokens where the // number of tokens is less than the maximum number of worker threads in the // pool and submit many instant tasks that can run concurrently. { constexpr auto kNumTokens = kMaxThreads - 1; ASSERT_GT(kNumTokens, 0); vector<unique_ptr<ThreadPoolToken>> tokens; tokens.reserve(kNumTokens); for (auto i = 0; i < kNumTokens; ++i) { tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); } ASSERT_FALSE(pool_->QueueOverloaded()); for (auto& t : tokens) { for (auto i = 0; i < kMaxThreads; ++i) { ASSERT_OK(t->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); })); } } ASSERT_FALSE(pool_->QueueOverloaded()); // Add several light tasks in addition to the scheduled serial ones. This // should not overload the queue. for (auto i = 0; i < 10; ++i) { ASSERT_OK(pool_->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(1)); })); } ASSERT_FALSE(pool_->QueueOverloaded()); SleepFor(MonoDelta::FromMilliseconds(1)); ASSERT_FALSE(pool_->QueueOverloaded()); SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); ASSERT_FALSE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); } // Another mixed case: submit many long running tasks via a serial token // and many long running tasks that can run concurrently. The queue should // become overloaded once the tasks in the head of the queue is kept there // for longer than kQueueTimeThresholdMs time interval. { constexpr auto kNumTokens = 1; vector<unique_ptr<ThreadPoolToken>> tokens; tokens.reserve(kNumTokens); for (auto i = 0; i < kNumTokens; ++i) { tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); } ASSERT_FALSE(pool_->QueueOverloaded()); for (auto& t : tokens) { for (auto i = 0; i < kMaxThreads; ++i) { ASSERT_OK(t->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); })); } } ASSERT_FALSE(pool_->QueueOverloaded()); // Add the heavy tasks in addition to the scheduled serial ones. The queue // should become overloaded after kQueueTimeThresholdMs time interval. for (auto i = 0; i < 2 * kMaxThreads; ++i) { ASSERT_OK(pool_->Submit([](){ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); })); } SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs)); ASSERT_TRUE(pool_->QueueOverloaded()); pool_->Wait(); ASSERT_FALSE(pool_->QueueOverloaded()); } } // A test for various scenarios to assess ThreadPool's performance. class ThreadPoolPerformanceTest : public ThreadPoolTest, public testing::WithParamInterface<bool> { }; INSTANTIATE_TEST_SUITE_P(LoadMeterPresence, ThreadPoolPerformanceTest, ::testing::Values(false, true)); // A scenario to assess ThreadPool's performance in the absence/presence // of the QueueLoadMeter. The scenario uses a mix of serial and concurrent // task tokens. TEST_P(ThreadPoolPerformanceTest, ConcurrentAndSerialTasksMix) { SKIP_IF_SLOW_NOT_ALLOWED(); constexpr auto kNumTasksPerSchedulerThread = 25000; const auto kNumCPUs = base::NumCPUs(); const auto kMaxThreads = std::max(1, kNumCPUs / 2); const auto kNumSchedulerThreads = std::max(2, kNumCPUs / 2); const auto kNumSerialTokens = kNumSchedulerThreads / 4; const auto load_meter_enabled = GetParam(); ThreadPoolBuilder builder(kDefaultPoolName); builder.set_min_threads(kMaxThreads); builder.set_max_threads(kMaxThreads); if (load_meter_enabled) { // The exact value of the queue overload threshold isn't important in this // test scenario. With low enough setting and huge number of scheduled // tasks, this guarantees that the queue becomes overloaded and all code // paths in QueueLoadMeter are covered. builder.set_queue_overload_threshold(MonoDelta::FromMilliseconds(1)); } ASSERT_OK(RebuildPoolWithBuilder(builder)); vector<unique_ptr<ThreadPoolToken>> tokens; tokens.reserve(kNumSchedulerThreads); for (auto i = 0; i < kNumSchedulerThreads; ++i) { tokens.emplace_back(pool_->NewToken( i < kNumSerialTokens ? ThreadPool::ExecutionMode::SERIAL : ThreadPool::ExecutionMode::CONCURRENT)); } vector<thread> threads; threads.reserve(kNumSchedulerThreads); Barrier b(kNumSchedulerThreads + 1); for (auto si = 0; si < kNumSchedulerThreads; ++si) { threads.emplace_back([&, si]() { unique_ptr<ThreadPoolToken> token(pool_->NewToken( (si < kNumSerialTokens) ? ThreadPool::ExecutionMode::SERIAL : ThreadPool::ExecutionMode::CONCURRENT)); b.Wait(); for (auto i = 0; i < kNumTasksPerSchedulerThread; ++i) { CHECK_OK(token->Submit([](){})); } }); } Stopwatch sw(Stopwatch::ALL_THREADS); b.Wait(); sw.start(); pool_->Wait(); sw.stop(); for (auto& t : threads) { t.join(); } const auto time_elapsed = sw.elapsed(); LOG(INFO) << Substitute("Processed $0 tasks in $1", kNumSchedulerThreads * kNumTasksPerSchedulerThread, time_elapsed.ToString()); LOG(INFO) << Substitute( "Processing rate (QueueLoadMeter $0): $1 tasks/sec", load_meter_enabled ? " enabled" : "disabled", static_cast<double>(kNumSchedulerThreads * kNumTasksPerSchedulerThread) / time_elapsed.wall_seconds()); } // Test that a thread pool will crash if asked to run its own blocking // functions in a pool thread. // // In a multi-threaded application, TSAN is unsafe to use following a fork(). // After a fork(), TSAN will: // 1. Disable verification, expecting an exec() soon anyway, and // 2. Die on future thread creation. // For some reason, this test triggers behavior #2. We could disable it with // the TSAN option die_after_fork=0, but this can (supposedly) lead to // deadlocks, so we'll disable the entire test instead. #ifndef THREAD_SANITIZER TEST_F(ThreadPoolTest, TestDeadlocks) { const char* death_msg = "called pool function that would result in deadlock"; ASSERT_DEATH({ ASSERT_OK(RebuildPoolWithMinMax(1, 1)); ASSERT_OK(pool_->Submit([this]() { this->pool_->Shutdown(); } )); pool_->Wait(); }, death_msg); ASSERT_DEATH({ ASSERT_OK(RebuildPoolWithMinMax(1, 1)); ASSERT_OK(pool_->Submit([this]() { this->pool_->Wait(); } )); pool_->Wait(); }, death_msg); } #endif class SlowDestructorRunnable { public: void Run() {} ~SlowDestructorRunnable() { SleepFor(MonoDelta::FromMilliseconds(100)); } }; // Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks // in the queue. TEST_F(ThreadPoolTest, TestSlowDestructor) { ASSERT_OK(RebuildPoolWithMinMax(1, 20)); MonoTime start = MonoTime::Now(); for (int i = 0; i < 100; i++) { // In this particular test, it's important that the task's destructor (and // thus the last ref of 'task') be dropped by the threadpool worker thread // itself, so that the delay is incurred by that thread and not the task // submission thread. Without C++14 capture-by-move semantics we have to // work a little bit harder to accomplish that. auto task = make_shared<SlowDestructorRunnable>(); auto wrapper = [task]() { task->Run(); }; task.reset(); ASSERT_OK(pool_->Submit(std::move(wrapper))); } pool_->Wait(); ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5); } // For test cases that should run with both kinds of tokens. class ThreadPoolTestTokenTypes : public ThreadPoolTest, public testing::WithParamInterface<ThreadPool::ExecutionMode> {}; INSTANTIATE_TEST_SUITE_P(Tokens, ThreadPoolTestTokenTypes, ::testing::Values(ThreadPool::ExecutionMode::SERIAL, ThreadPool::ExecutionMode::CONCURRENT)); TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) { unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam()); int i = 0; ASSERT_OK(t->Submit([&]() { SleepFor(MonoDelta::FromMilliseconds(1)); i++; })); t->Wait(); ASSERT_EQ(1, i); } TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) { unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL); Random r(SeedRandom()); string result; for (char c = 'a'; c < 'f'; c++) { // Sleep a little first so that there's a higher chance of out-of-order // appends if the submissions did execute in parallel. int sleep_ms = static_cast<int>(r.Uniform(5)); ASSERT_OK(t->Submit([&result, c, sleep_ms]() { SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); result += c; })); } t->Wait(); ASSERT_EQ("abcde", result); } TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) { const int kNumTokens = 5; ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_max_threads(kNumTokens))); vector<unique_ptr<ThreadPoolToken>> tokens; // A violation to the tested invariant would yield a deadlock, so let's set // up an alarm to bail us out. alarm(60); SCOPED_CLEANUP({ alarm(0); // Disable alarm on test exit. }); auto b = make_shared<Barrier>(kNumTokens + 1); for (int i = 0; i < kNumTokens; i++) { tokens.emplace_back(pool_->NewToken(GetParam())); ASSERT_OK(tokens.back()->Submit([b]() { b->Wait(); })); } // This will deadlock if the above tasks weren't all running concurrently. b->Wait(); } TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) { const int kNumSubmissions = 5; ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_max_threads(kNumSubmissions))); // A violation to the tested invariant would yield a deadlock, so let's set // up an alarm to bail us out. alarm(60); SCOPED_CLEANUP({ alarm(0); // Disable alarm on test exit. }); auto b = make_shared<Barrier>(kNumSubmissions + 1); unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); for (int i = 0; i < kNumSubmissions; i++) { ASSERT_OK(t->Submit([b]() { b->Wait(); })); } // This will deadlock if the above tasks weren't all running concurrently. b->Wait(); } TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) { ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_max_threads(4))); unique_ptr<ThreadPoolToken> t1(pool_->NewToken(GetParam())); unique_ptr<ThreadPoolToken> t2(pool_->NewToken(GetParam())); CountDownLatch l1(1); CountDownLatch l2(1); // A violation to the tested invariant would yield a deadlock, so let's set // up an alarm to bail us out. alarm(60); SCOPED_CLEANUP({ alarm(0); // Disable alarm on test exit. }); for (int i = 0; i < 3; i++) { ASSERT_OK(t1->Submit([&]() { l1.Wait(); })); } for (int i = 0; i < 3; i++) { ASSERT_OK(t2->Submit([&]() { l2.Wait(); })); } // Unblock all of t1's tasks, but not t2's tasks. l1.CountDown(); // If this also waited for t2's tasks, it would deadlock. t1->Shutdown(); // We can no longer submit to t1 but we can still submit to t2. ASSERT_TRUE(t1->Submit([](){}).IsServiceUnavailable()); ASSERT_OK(t2->Submit([](){})); // Unblock t2's tasks. l2.CountDown(); t2->Shutdown(); } TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) { const int kNumTokens = 3; const int kNumSubmissions = 20; Random r(SeedRandom()); vector<unique_ptr<ThreadPoolToken>> tokens; for (int i = 0; i < kNumTokens; i++) { tokens.emplace_back(pool_->NewToken(GetParam())); } atomic<int32_t> v(0); for (int i = 0; i < kNumSubmissions; i++) { // Sleep a little first to raise the likelihood of the test thread // reaching Wait() before the submissions finish. int sleep_ms = static_cast<int>(r.Uniform(5)); auto task = [&v, sleep_ms]() { SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); v++; }; // Half of the submissions will be token-less, and half will use a token. if (i % 2 == 0) { ASSERT_OK(pool_->Submit(task)); } else { int token_idx = static_cast<int>(r.Uniform(tokens.size())); ASSERT_OK(tokens[token_idx]->Submit(task)); } } pool_->Wait(); ASSERT_EQ(kNumSubmissions, v); } TEST_F(ThreadPoolTest, TestFuzz) { const int kNumOperations = 1000; Random r(SeedRandom()); vector<unique_ptr<ThreadPoolToken>> tokens; for (int i = 0; i < kNumOperations; i++) { // Operation distribution: // // - Submit without a token: 40% // - Submit with a randomly selected token: 35% // - Allocate a new token: 10% // - Wait on a randomly selected token: 7% // - Shutdown a randomly selected token: 4% // - Deallocate a randomly selected token: 2% // - Wait for all submissions: 2% int op = static_cast<int>(r.Uniform(100)); if (op < 40) { // Submit without a token. int sleep_ms = static_cast<int>(r.Uniform(5)); ASSERT_OK(pool_->Submit([sleep_ms]() { // Sleep a little first to increase task overlap. SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); })); } else if (op < 75) { // Submit with a randomly selected token. if (tokens.empty()) { continue; } int sleep_ms = static_cast<int>(r.Uniform(5)); int token_idx = static_cast<int>(r.Uniform(tokens.size())); Status s = tokens[token_idx]->Submit([sleep_ms]() { // Sleep a little first to increase task overlap. SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); }); ASSERT_TRUE(s.ok() || s.IsServiceUnavailable()); } else if (op < 85) { // Allocate a token with a randomly selected policy. ThreadPool::ExecutionMode mode = r.OneIn(2) ? ThreadPool::ExecutionMode::SERIAL : ThreadPool::ExecutionMode::CONCURRENT; tokens.emplace_back(pool_->NewToken(mode)); } else if (op < 92) { // Wait on a randomly selected token. if (tokens.empty()) { continue; } int token_idx = static_cast<int>(r.Uniform(tokens.size())); tokens[token_idx]->Wait(); } else if (op < 96) { // Shutdown a randomly selected token. if (tokens.empty()) { continue; } int token_idx = static_cast<int>(r.Uniform(tokens.size())); tokens[token_idx]->Shutdown(); } else if (op < 98) { // Deallocate a randomly selected token. if (tokens.empty()) { continue; } auto it = tokens.begin(); int token_idx = static_cast<int>(r.Uniform(tokens.size())); std::advance(it, token_idx); tokens.erase(it); } else { // Wait on everything. ASSERT_LT(op, 100); ASSERT_GE(op, 98); pool_->Wait(); } } // Some test runs will shut down the pool before the tokens, and some won't. // Either way should be safe. if (r.OneIn(2)) { pool_->Shutdown(); } } TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) { ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_min_threads(1) .set_max_threads(1) .set_max_queue_size(1))); CountDownLatch latch(1); unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam()); SCOPED_CLEANUP({ latch.CountDown(); }); // We will be able to submit two tasks: one for max_threads == 1 and one for // max_queue_size == 1. ASSERT_OK(t->Submit([&latch]() { latch.Wait(); })); ASSERT_OK(t->Submit([&latch]() { latch.Wait(); })); Status s = t->Submit([&latch]() { latch.Wait(); }); ASSERT_TRUE(s.IsServiceUnavailable()); } TEST_F(ThreadPoolTest, TestTokenConcurrency) { const int kNumTokens = 20; const int kTestRuntimeSecs = 1; const int kCycleThreads = 2; const int kShutdownThreads = 2; const int kWaitThreads = 2; const int kSubmitThreads = 8; vector<shared_ptr<ThreadPoolToken>> tokens; Random rng(SeedRandom()); // Protects 'tokens' and 'rng'. simple_spinlock lock; // Fetch a token from 'tokens' at random. auto GetRandomToken = [&]() { std::lock_guard<simple_spinlock> l(lock); int idx = rng.Uniform(kNumTokens); return tokens[idx]; }; // Preallocate all of the tokens. for (int i = 0; i < kNumTokens; i++) { ThreadPool::ExecutionMode mode; { std::lock_guard<simple_spinlock> l(lock); mode = rng.OneIn(2) ? ThreadPool::ExecutionMode::SERIAL : ThreadPool::ExecutionMode::CONCURRENT; } tokens.emplace_back(pool_->NewToken(mode).release()); } atomic<int64_t> total_num_tokens_cycled(0); atomic<int64_t> total_num_tokens_shutdown(0); atomic<int64_t> total_num_tokens_waited(0); atomic<int64_t> total_num_tokens_submitted(0); CountDownLatch latch(1); vector<thread> threads; for (int i = 0; i < kCycleThreads; i++) { // Pick a token at random and replace it. // // The replaced token is only destroyed when the last ref is dropped, // possibly by another thread. threads.emplace_back([&]() { int num_tokens_cycled = 0; while (latch.count()) { { std::lock_guard<simple_spinlock> l(lock); int idx = rng.Uniform(kNumTokens); ThreadPool::ExecutionMode mode = rng.OneIn(2) ? ThreadPool::ExecutionMode::SERIAL : ThreadPool::ExecutionMode::CONCURRENT; tokens[idx] = shared_ptr<ThreadPoolToken>(pool_->NewToken(mode).release()); } num_tokens_cycled++; // Sleep a bit, otherwise this thread outpaces the other threads and // nothing interesting happens to most tokens. SleepFor(MonoDelta::FromMicroseconds(10)); } total_num_tokens_cycled += num_tokens_cycled; }); } for (int i = 0; i < kShutdownThreads; i++) { // Pick a token at random and shut it down. Submitting a task to a shut // down token will return a ServiceUnavailable error. threads.emplace_back([&]() { int num_tokens_shutdown = 0; while (latch.count()) { GetRandomToken()->Shutdown(); num_tokens_shutdown++; } total_num_tokens_shutdown += num_tokens_shutdown; }); } for (int i = 0; i < kWaitThreads; i++) { // Pick a token at random and wait for any outstanding tasks. threads.emplace_back([&]() { int num_tokens_waited = 0; while (latch.count()) { GetRandomToken()->Wait(); num_tokens_waited++; } total_num_tokens_waited += num_tokens_waited; }); } for (int i = 0; i < kSubmitThreads; i++) { // Pick a token at random and submit a task to it. threads.emplace_back([&]() { int num_tokens_submitted = 0; Random rng(SeedRandom()); while (latch.count()) { int sleep_ms = static_cast<int>(rng.Uniform(5)); Status s = GetRandomToken()->Submit([sleep_ms]() { // Sleep a little first so that tasks are running during other events. SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); }); CHECK(s.ok() || s.IsServiceUnavailable()); num_tokens_submitted++; } total_num_tokens_submitted += num_tokens_submitted; }); } SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs)); latch.CountDown(); for (auto& t : threads) { t.join(); } LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1", kCycleThreads, total_num_tokens_cycled.load()); LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1", kShutdownThreads, total_num_tokens_shutdown.load()); LOG(INFO) << Substitute("Tokens waited ($0 threads): $1", kWaitThreads, total_num_tokens_waited.load()); LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1", kSubmitThreads, total_num_tokens_submitted.load()); } TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) { const int kNumThreads = 10; // Test with a pool that allows for kNumThreads concurrent threads. ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) .set_max_threads(kNumThreads))); // Submit kNumThreads slow tasks and unblock them, in order to produce // kNumThreads worker threads. CountDownLatch latch(1); SCOPED_CLEANUP({ latch.CountDown(); }); for (int i = 0; i < kNumThreads; i++) { ASSERT_OK(pool_->Submit([&latch]() { latch.Wait(); })); } ASSERT_EQ(kNumThreads, pool_->num_threads()); latch.CountDown(); pool_->Wait(); // The kNumThreads threads are idle and waiting for the idle timeout. // Submit a slow trickle of lightning fast tasks. // // If the threads are woken up in FIFO order, this trickle is enough to // prevent all of them from idling and the AssertEventually will time out. // // If LIFO order is used, the same thread will be reused for each task and // the other threads will eventually time out. AssertEventually([&]() { ASSERT_OK(pool_->Submit([](){})); SleepFor(MonoDelta::FromMilliseconds(10)); ASSERT_EQ(1, pool_->num_threads()); }, MonoDelta::FromSeconds(10), AssertBackoff::NONE); NO_PENDING_FATALS(); } } // namespace kudu