common/protobuf/kudu/util/thread.cc (495 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // // Copied from Impala and adapted to Kudu. #include "kudu/util/thread.h" #if defined(__linux__) #include <sys/prctl.h> #endif // defined(__linux__) #include <sys/resource.h> #include <sys/time.h> #include <unistd.h> #include <algorithm> #include <cerrno> #include <cstring> #include <memory> #include <mutex> #include <sstream> #include <unordered_map> #include <utility> #include <vector> #include <boost/smart_ptr/shared_ptr.hpp> #include <gflags/gflags.h> #include <glog/logging.h> #include "kudu/gutil/atomicops.h" #include "kudu/gutil/dynamic_annotations.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/mathlimits.h" #include "kudu/gutil/once.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/easy_json.h" #include "kudu/util/env.h" #include "kudu/util/flag_tags.h" #include "kudu/util/kernel_stack_watchdog.h" #include "kudu/util/locks.h" #include "kudu/util/logging.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/os-util.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/stopwatch.h" #include "kudu/util/trace.h" #include "kudu/util/url-coding.h" #include "kudu/util/web_callback_registry.h" using std::ostringstream; using std::pair; using std::shared_ptr; using std::string; using std::vector; using std::unordered_map; using strings::Substitute; METRIC_DEFINE_gauge_uint64(server, threads_started, "Threads Started", kudu::MetricUnit::kThreads, "Total number of threads started on this server", kudu::MetricLevel::kDebug, kudu::EXPOSE_AS_COUNTER); METRIC_DEFINE_gauge_uint64(server, threads_running, "Threads Running", kudu::MetricUnit::kThreads, "Current number of running threads", kudu::MetricLevel::kInfo); METRIC_DEFINE_gauge_uint64(server, cpu_utime, "User CPU Time", kudu::MetricUnit::kMilliseconds, "Total user CPU time of the process", kudu::MetricLevel::kInfo, kudu::EXPOSE_AS_COUNTER); METRIC_DEFINE_gauge_uint64(server, cpu_stime, "System CPU Time", kudu::MetricUnit::kMilliseconds, "Total system CPU time of the process", kudu::MetricLevel::kInfo, kudu::EXPOSE_AS_COUNTER); METRIC_DEFINE_gauge_uint64(server, voluntary_context_switches, "Voluntary Context Switches", kudu::MetricUnit::kContextSwitches, "Total voluntary context switches", kudu::MetricLevel::kInfo, kudu::EXPOSE_AS_COUNTER); METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches, "Involuntary Context Switches", kudu::MetricUnit::kContextSwitches, "Total involuntary context switches", kudu::MetricLevel::kInfo, kudu::EXPOSE_AS_COUNTER); METRIC_DEFINE_gauge_int32(server, scheduling_priority, "Scheduling Priority", kudu::MetricUnit::kState, "The scheduling priority of the process", kudu::MetricLevel::kDebug); DEFINE_int32(thread_inject_start_latency_ms, 0, "Number of ms to sleep when starting a new thread. (For tests)."); TAG_FLAG(thread_inject_start_latency_ms, hidden); TAG_FLAG(thread_inject_start_latency_ms, unsafe); namespace kudu { static uint64_t GetCpuUTime() { struct rusage ru; CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); return ru.ru_utime.tv_sec * 1000UL + ru.ru_utime.tv_usec / 1000UL; } static uint64_t GetCpuSTime() { struct rusage ru; CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); return ru.ru_stime.tv_sec * 1000UL + ru.ru_stime.tv_usec / 1000UL; } static uint64_t GetVoluntaryContextSwitches() { struct rusage ru; CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); return ru.ru_nvcsw; } static uint64_t GetInVoluntaryContextSwitches() { struct rusage ru; CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); return ru.ru_nivcsw; } static int32_t GetSchedulingPriority() { const int saved_errno = errno; SCOPED_CLEANUP({ errno = saved_errno; }); errno = 0; int prio = getpriority(PRIO_PROCESS, getpid()); // No errors expected here per the manual page for the getpriority() syscall: // see https://man7.org/linux/man-pages/man2/setpriority.2.html. PCHECK(errno == 0); return prio; } class ThreadMgr; __thread Thread* Thread::tls_ = nullptr; // Singleton instance of ThreadMgr. Only visible in this file, used only by Thread. // The Thread class adds a reference to thread_manager while it is supervising a thread so // that a race between the end of the process's main thread (and therefore the destruction // of thread_manager) and the end of a thread that tries to remove itself from the // manager after the destruction can be avoided. static shared_ptr<ThreadMgr> thread_manager; // Controls the single (lazy) initialization of thread_manager. static GoogleOnceType once = GOOGLE_ONCE_INIT; // A singleton class that tracks all live threads, and groups them together for easy // auditing. Used only by Thread. class ThreadMgr { public: ThreadMgr() : threads_started_metric_(0), threads_running_metric_(0) { } ~ThreadMgr() { thread_categories_.clear(); } static void SetThreadName(const string& name, int64_t tid); Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web) const; // Registers a thread to the supplied category. The key is a pthread_t, // not the system TID, since pthread_t is less prone to being recycled. void AddThread(const pthread_t& pthread_id, const string& name, const string& category, int64_t tid); // Removes a thread from the supplied category. If the thread has // already been removed, this is a no-op. void RemoveThread(const pthread_t& pthread_id, const string& category); // Metric callback for number of threads running. Also used for error messages. uint64_t ReadThreadsRunning() const; private: // Container class for any details we want to capture about a thread // TODO: Add start-time. // TODO: Track fragment ID. class ThreadDescriptor { public: ThreadDescriptor() { } ThreadDescriptor(string category, string name, int64_t thread_id) : name_(std::move(name)), category_(std::move(category)), thread_id_(thread_id) {} const string& name() const { return name_; } const string& category() const { return category_; } int64_t thread_id() const { return thread_id_; } struct Comparator { bool operator()(const ThreadDescriptor& rhs, const ThreadDescriptor& lhs) const { return rhs.name() < lhs.name(); } }; private: string name_; string category_; int64_t thread_id_; }; struct ThreadIdHash { size_t operator()(pthread_t thread_id) const noexcept { return std::hash<pthread_t>()(thread_id); } }; struct ThreadIdEqual { bool operator()(pthread_t lhs, pthread_t rhs) const { return pthread_equal(lhs, rhs) != 0; } }; // A ThreadCategory is a set of threads that are logically related. typedef unordered_map<const pthread_t, ThreadDescriptor, ThreadIdHash, ThreadIdEqual> ThreadCategory; // All thread categories, keyed on the category name. typedef unordered_map<string, ThreadCategory> ThreadCategoryMap; // Protects thread_categories_ and thread metrics. mutable rw_spinlock lock_; // All thread categories that ever contained a thread, even if empty. ThreadCategoryMap thread_categories_; // Counters to track all-time total number of threads, and the // current number of running threads. uint64_t threads_started_metric_; uint64_t threads_running_metric_; // Metric callback for number of threads started. uint64_t ReadThreadsStarted() const; // Webpage callback; prints all threads by category. void ThreadPathHandler(const WebCallbackRegistry::WebRequest& req, WebCallbackRegistry::WebResponse* resp) const; void SummarizeThreadDescriptor(const ThreadDescriptor& desc, EasyJson* output) const; }; void ThreadMgr::SetThreadName(const string& name, int64_t tid) { // On linux we can get the thread names to show up in the debugger by setting // the process name for the LWP. We don't want to do this for the main // thread because that would rename the process, causing tools like killall // to stop working. if (tid == getpid()) { return; } #if defined(__linux__) // http://0pointer.de/blog/projects/name-your-threads.html // Set the name for the LWP (which gets truncated to 15 characters). // Note that glibc also has a 'pthread_setname_np' api, but it may not be // available everywhere and it's only benefit over using prctl directly is // that it can set the name of threads other than the current thread. int err = prctl(PR_SET_NAME, name.c_str()); #else int err = pthread_setname_np(name.c_str()); #endif // defined(__linux__) // We expect EPERM failures in sandboxed processes, just ignore those. if (err < 0 && errno != EPERM) { PLOG(ERROR) << "SetThreadName"; } } Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web) const { // Use function gauges here so that we can register a unique copy of these metrics in // multiple tservers, even though the ThreadMgr is itself a singleton. metrics->NeverRetire( METRIC_threads_started.InstantiateFunctionGauge( metrics, [this]() { return this->ReadThreadsStarted(); })); metrics->NeverRetire( METRIC_threads_running.InstantiateFunctionGauge( metrics, [this]() { return this->ReadThreadsRunning(); })); metrics->NeverRetire( METRIC_cpu_utime.InstantiateFunctionGauge( metrics, []() { return GetCpuUTime(); })); metrics->NeverRetire( METRIC_cpu_stime.InstantiateFunctionGauge( metrics, []() { return GetCpuSTime(); })); metrics->NeverRetire( METRIC_voluntary_context_switches.InstantiateFunctionGauge( metrics, []() { return GetVoluntaryContextSwitches(); })); metrics->NeverRetire( METRIC_involuntary_context_switches.InstantiateFunctionGauge( metrics, []() { return GetInVoluntaryContextSwitches(); })); metrics->NeverRetire( METRIC_scheduling_priority.InstantiateFunctionGauge( metrics, []() { return GetSchedulingPriority(); })); if (web) { DCHECK_NOTNULL(web)->RegisterPathHandler( "/threadz", "Threads", [this](const WebCallbackRegistry::WebRequest& req, WebCallbackRegistry::WebResponse* resp) { this->ThreadPathHandler(req, resp); }, /* is_styled= */ true, /* is_on_nav_bar= */ true); } return Status::OK(); } uint64_t ThreadMgr::ReadThreadsStarted() const { shared_lock<decltype(lock_)> l(lock_); return threads_started_metric_; } uint64_t ThreadMgr::ReadThreadsRunning() const { shared_lock<decltype(lock_)> l(lock_); return threads_running_metric_; } void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name, const string& category, int64_t tid) { // These annotations cause TSAN to ignore the synchronization on lock_ // without causing the subsequent mutations to be treated as data races // in and of themselves (that's what IGNORE_READS_AND_WRITES does). // // Why do we need them here and in SuperviseThread()? TSAN operates by // observing synchronization events and using them to establish "happens // before" relationships between threads. Where these relationships are // not built, shared state access constitutes a data race. The // synchronization events here, in RemoveThread(), and in // SuperviseThread() may cause TSAN to establish a "happens before" // relationship between thread functors, ignoring potential data races. // The annotations prevent this from happening. ANNOTATE_IGNORE_SYNC_BEGIN(); ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); { // NOTE: Not using EmplaceOrDie() here -- that's because in environments // where fork() is called after some threads have been spawned, child // processes will inadvertently inherit the contents of the thread // registry (i.e. the entries in the thread_categories_ container). // For some platforms, pthread_t handles for threads in different // processes might be the same, so using EmplaceOrDie() would induce // a crash when ThreadMgr::AddThread() is called for a new thread // in the child process. // // TODO(aserbin): maybe, keep the thread_categories_ registry not in a // global static container, but bind the container with the life cycle // of some top-level object that uses the ThreadMgr as a singleton. std::lock_guard<decltype(lock_)> l(lock_); thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid); ++threads_running_metric_; ++threads_started_metric_; } ANNOTATE_IGNORE_SYNC_END(); ANNOTATE_IGNORE_READS_AND_WRITES_END(); } void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category) { ANNOTATE_IGNORE_SYNC_BEGIN(); ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); { std::lock_guard<decltype(lock_)> l(lock_); auto& threads = FindOrDie(thread_categories_, category); auto num_erased = threads.erase(pthread_id); CHECK_EQ(1, num_erased); --threads_running_metric_; } ANNOTATE_IGNORE_SYNC_END(); ANNOTATE_IGNORE_READS_AND_WRITES_END(); } void ThreadMgr::SummarizeThreadDescriptor(const ThreadDescriptor& desc, EasyJson* output) const { ThreadStats stats; Status status = GetThreadStats(desc.thread_id(), &stats); if (!status.ok()) { KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: " << status.ToString(); } EasyJson thr = output->PushBack(EasyJson::kObject); thr["thread_name"] = desc.name(); thr["user_sec"] = static_cast<double>(stats.user_ns) / 1e9; thr["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9; thr["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9; } void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req, WebCallbackRegistry::WebResponse* resp) const { EasyJson& output = resp->output; const auto* category_name = FindOrNull(req.parsed_args, "group"); if (category_name) { // List all threads belonging to the desired thread group. bool requested_all = *category_name == "all"; EasyJson rtg = output.Set("requested_thread_group", EasyJson::kObject); rtg["group_name"] = EscapeForHtmlToString(*category_name); rtg["requested_all"] = requested_all; // The critical section is as short as possible so as to minimize the delay // imposed on new threads that acquire the lock in write mode. vector<ThreadDescriptor> descriptors_to_print; if (!requested_all) { shared_lock<decltype(lock_)> l(lock_); const auto* category = FindOrNull(thread_categories_, *category_name); if (!category) { return; } for (const auto& elem : *category) { descriptors_to_print.emplace_back(elem.second); } } else { shared_lock<decltype(lock_)> l(lock_); for (const auto& category : thread_categories_) { for (const auto& elem : category.second) { descriptors_to_print.emplace_back(elem.second); } } } EasyJson found = rtg.Set("found", EasyJson::kObject); EasyJson threads = found.Set("threads", EasyJson::kArray); for (const auto& desc : descriptors_to_print) { SummarizeThreadDescriptor(desc, &threads); } } else { // List all thread groups and the number of threads running in each. vector<pair<string, uint64_t>> thread_categories_info; uint64_t running; { // See comment above regarding short critical sections. shared_lock<decltype(lock_)> l(lock_); running = threads_running_metric_; thread_categories_info.reserve(thread_categories_.size()); for (const auto& category : thread_categories_) { thread_categories_info.emplace_back(category.first, category.second.size()); } } output["total_threads_running"] = running; EasyJson groups = output.Set("groups", EasyJson::kArray); for (const auto& elem : thread_categories_info) { string category_arg; if (WebCallbackRegistry::IsProxiedViaKnox(req)) { // Knox encodes query parameter values when it rewrites HTTP responses. // If we also encoded, we'd end up with broken URLs. For example, we'd // encode the query parameter 'group=service pool' to // 'group=service%20pool', then Knox would encode it again to // 'group=service%2520pool'. category_arg = elem.first; } else { UrlEncode(elem.first, &category_arg); } EasyJson g = groups.PushBack(EasyJson::kObject); g["encoded_group_name"] = category_arg; g["group_name"] = elem.first; g["threads_running"] = elem.second; } } } static void InitThreading() { thread_manager.reset(new ThreadMgr()); } Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics, WebCallbackRegistry* web) { GoogleOnceInit(&once, &InitThreading); return thread_manager->StartInstrumentation(server_metrics, web); } ThreadJoiner::ThreadJoiner(Thread* thr) : thread_(CHECK_NOTNULL(thr)), warn_after_ms_(kDefaultWarnAfterMs), warn_every_ms_(kDefaultWarnEveryMs), give_up_after_ms_(kDefaultGiveUpAfterMs) { } ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) { warn_after_ms_ = ms; return *this; } ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) { warn_every_ms_ = ms; return *this; } ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) { give_up_after_ms_ = ms; return *this; } Status ThreadJoiner::Join() { if (Thread::current_thread() && Thread::current_thread()->tid() == thread_->tid()) { return Status::InvalidArgument("Can't join on own thread", thread_->name_); } // Early exit: double join is a no-op. if (!thread_->joinable_) { return Status::OK(); } int waited_ms = 0; bool keep_trying = true; while (keep_trying) { if (waited_ms >= warn_after_ms_) { LOG(WARNING) << Substitute("Waited for $0ms trying to join with $1 (tid $2)", waited_ms, thread_->name_, thread_->tid_); } int remaining_before_giveup = MathLimits<int>::kMax; if (give_up_after_ms_ != -1) { remaining_before_giveup = give_up_after_ms_ - waited_ms; } int remaining_before_next_warn = warn_every_ms_; if (waited_ms < warn_after_ms_) { remaining_before_next_warn = warn_after_ms_ - waited_ms; } if (remaining_before_giveup < remaining_before_next_warn) { keep_trying = false; } int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn); if (thread_->done_.WaitFor(MonoDelta::FromMilliseconds(wait_for))) { // Unconditionally join before returning, to guarantee that any TLS // has been destroyed (pthread_key_create() destructors only run // after a pthread's user method has returned). int ret = pthread_join(thread_->thread_, nullptr); CHECK_EQ(ret, 0); thread_->joinable_ = false; return Status::OK(); } waited_ms += wait_for; } return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1", waited_ms, thread_->name_)); } Thread::~Thread() { if (joinable_) { int ret = pthread_detach(thread_); CHECK_EQ(ret, 0); } } string Thread::ToString() const { return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), name_, category_); } int64_t Thread::WaitForTid() const { const string log_prefix = Substitute("$0 ($1) ", name_, category_); SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "waiting for new thread to publish its TID"); int loop_count = 0; while (true) { int64_t t = Acquire_Load(&tid_); if (t != PARENT_WAITING_TID) return t; boost::detail::yield(loop_count++); } } Status Thread::StartThread(string category, string name, std::function<void()> functor, uint64_t flags, scoped_refptr<Thread>* holder) { TRACE_COUNTER_INCREMENT("threads_started", 1); TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us"); GoogleOnceInit(&once, &InitThreading); const string log_prefix = Substitute("$0 ($1) ", name, category); SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread"); // Temporary reference for the duration of this function. scoped_refptr<Thread> t(new Thread( std::move(category), std::move(name), std::move(functor))); // Optional, and only set if the thread was successfully created. // // We have to set this before we even start the thread because it's // allowed for the thread functor to access 'holder'. if (holder) { *holder = t; } t->tid_ = PARENT_WAITING_TID; // Add a reference count to the thread since SuperviseThread() needs to // access the thread object, and we have no guarantee that our caller // won't drop the reference as soon as we return. This is dereferenced // in FinishThread(). t->AddRef(); auto cleanup = MakeScopedCleanup([&]() { // If we failed to create the thread, we need to undo all of our prep work. t->tid_ = INVALID_TID; t->Release(); }); if (PREDICT_FALSE(FLAGS_thread_inject_start_latency_ms > 0)) { LOG(INFO) << "Injecting " << FLAGS_thread_inject_start_latency_ms << "ms sleep on thread start"; SleepFor(MonoDelta::FromMilliseconds(FLAGS_thread_inject_start_latency_ms)); } { SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread"); SCOPED_WATCH_STACK((flags & NO_STACK_WATCHDOG) ? 0 : 250); int ret = pthread_create(&t->thread_, nullptr, &Thread::SuperviseThread, t.get()); if (ret) { string msg = ""; if (ret == EAGAIN) { uint64_t rlimit_nproc = Env::Default()->GetResourceLimit( Env::ResourceLimitType::RUNNING_THREADS_PER_EUID); uint64_t num_threads = thread_manager->ReadThreadsRunning(); msg = Substitute(" ($0 Kudu-managed threads running in this process, " "$1 max processes allowed for current user)", num_threads, rlimit_nproc); } return Status::RuntimeError(Substitute("Could not create thread$0", msg), strerror(ret), ret); } } // The thread has been created and is now joinable. // // Why set this in the parent and not the child? Because only the parent // (or someone communicating with the parent) can join, so joinable must // be set before the parent returns. t->joinable_ = true; cleanup.cancel(); VLOG(2) << Substitute("Started thread $0 - $1: $2", t->tid(), t->category(), t->name()); return Status::OK(); } void* Thread::SuperviseThread(void* arg) { Thread* t = static_cast<Thread*>(arg); int64_t system_tid = Thread::CurrentThreadId(); PCHECK(system_tid != -1); // Take an additional reference to the thread manager, which we'll need below. ANNOTATE_IGNORE_SYNC_BEGIN(); shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager; ANNOTATE_IGNORE_SYNC_END(); // Set up the TLS. // // We could store a scoped_refptr in the TLS itself, but as its // lifecycle is poorly defined, we'll use a bare pointer. We // already incremented the reference count in StartThread. Thread::tls_ = t; // Publish our tid to 'tid_', which unblocks any callers waiting in // WaitForTid(). Release_Store(&t->tid_, system_tid); string name = strings::Substitute("$0-$1", t->name(), system_tid); thread_manager->SetThreadName(name, t->tid_); thread_manager->AddThread(pthread_self(), name, t->category(), t->tid_); // FinishThread() is guaranteed to run (even if functor_ throws an // exception) because pthread_cleanup_push() creates a scoped object // whose destructor invokes the provided callback. pthread_cleanup_push(&Thread::FinishThread, t); t->functor_(); pthread_cleanup_pop(true); return nullptr; } void Thread::FinishThread(void* arg) { Thread* t = static_cast<Thread*>(arg); // We're here either because of the explicit pthread_cleanup_pop() in // SuperviseThread() or through pthread_exit(). In either case, // thread_manager is guaranteed to be live because thread_mgr_ref in // SuperviseThread() is still live. thread_manager->RemoveThread(pthread_self(), t->category()); // Signal any Joiner that we're done. t->done_.CountDown(); VLOG(2) << "Ended thread " << t->tid_ << " - " << t->category() << ":" << t->name(); t->Release(); // NOTE: the above 'Release' call could be the last reference to 'this', // so 'this' could be destructed at this point. Do not add any code // following here! } } // namespace kudu