common/protobuf/kudu/rpc/reactor.cc (712 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "kudu/rpc/reactor.h" #include <openssl/crypto.h> #include <openssl/err.h> // IWYU pragma: keep #include <sys/socket.h> #include <cerrno> #include <functional> #include <memory> #include <mutex> #include <ostream> #include <string> #include <utility> #include <boost/intrusive/list.hpp> #include <ev++.h> #include <ev.h> #include <gflags/gflags.h> #include <glog/logging.h> #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/sysinfo.h" #include "kudu/gutil/walltime.h" #include "kudu/rpc/connection.h" #include "kudu/rpc/messenger.h" #include "kudu/rpc/negotiation.h" #include "kudu/rpc/outbound_call.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/rpc_introspection.pb.h" #include "kudu/util/countdown_latch.h" #include "kudu/util/debug/sanitizer_scopes.h" #include "kudu/util/flag_tags.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/net/socket.h" #include "kudu/util/random_util.h" #include "kudu/util/status.h" #include "kudu/util/thread.h" #include "kudu/util/thread_restrictions.h" #include "kudu/util/threadpool.h" #include "kudu/util/trace.h" // When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop. // Otherwise we run into problems because 'select' can't handle connections when more than 1024 // file descriptors are open by the process. #if defined(__APPLE__) static const int kDefaultLibEvFlags = ev::KQUEUE; #else static const int kDefaultLibEvFlags = ev::AUTO; #endif using std::string; using std::shared_ptr; using std::unique_ptr; using strings::Substitute; DEFINE_bool(rpc_reopen_outbound_connections, false, "Open a new connection to the server for every RPC call. " "If not enabled, an already existing connection to a " "server is reused upon making another call to the same server. " "When this flag is enabled, an already existing _idle_ connection " "to the server is closed upon making another RPC call which would " "reuse the connection otherwise. " "Used by tests only."); TAG_FLAG(rpc_reopen_outbound_connections, unsafe); TAG_FLAG(rpc_reopen_outbound_connections, runtime); DEFINE_int32(tcp_keepalive_probe_period_s, 60, "The duration in seconds after an outbound connection has gone idle " "before a TCP keepalive probe is sent to the peer. Set to 0 to disable " "TCP keepalive probes from being sent."); DEFINE_int32(tcp_keepalive_retry_period_s, 3, "The duration in seconds between successive keepalive probes from an " "outbound connection if the previous probes are not acknowledged. " "Effective only if --tcp_keepalive_probe_period_s is not 0."); DEFINE_int32(tcp_keepalive_retry_count, 10, "The maximum number of keepalive probes sent before declaring the remote " "end as dead. Effective only if --tcp_keepalive_probe_period_s is not 0."); TAG_FLAG(tcp_keepalive_probe_period_s, advanced); TAG_FLAG(tcp_keepalive_retry_period_s, advanced); TAG_FLAG(tcp_keepalive_retry_count, advanced); METRIC_DEFINE_histogram(server, reactor_load_percent, "Reactor Thread Load Percentage", kudu::MetricUnit::kUnits, "The percentage of time that the reactor is busy " "(not blocked awaiting network activity). If this metric " "shows significant samples nears 100%, increasing the " "number of reactors may be beneficial.", kudu::MetricLevel::kInfo, 100, 2); METRIC_DEFINE_histogram(server, reactor_active_latency_us, "Reactor Thread Active Latency", kudu::MetricUnit::kMicroseconds, "Histogram of the wall clock time for reactor thread wake-ups. " "The reactor thread is responsible for all network I/O and " "therefore outliers in this latency histogram directly contribute " "to the latency of both inbound and outbound RPCs.", kudu::MetricLevel::kInfo, 1000000, 2); namespace kudu { namespace rpc { namespace { Status ShutdownError(bool aborted) { const char* msg = "reactor is shutting down"; return aborted ? Status::Aborted(msg, "", ESHUTDOWN) : Status::ServiceUnavailable(msg, "", ESHUTDOWN); } // Callback for libev fatal errors (eg running out of file descriptors). // Unfortunately libev doesn't plumb these back through to the caller, but // instead just expects the callback to abort. // // This implementation is slightly preferable to the built-in one since // it uses a FATAL log message instead of printing to stderr, which might // not end up anywhere useful in a daemonized context. void LibevSysErr(const char* msg) throw() { PLOG(FATAL) << "LibEV fatal error: " << msg; } void DoInitLibEv() { ev::set_syserr_cb(LibevSysErr); } } // anonymous namespace ReactorThread::ReactorThread(Reactor* reactor, const MessengerBuilder& bld) : loop_(kDefaultLibEvFlags), cur_time_(MonoTime::Now()), last_unused_tcp_scan_(cur_time_), reactor_(reactor), connection_keepalive_time_(bld.connection_keepalive_time_), coarse_timer_granularity_(bld.coarse_timer_granularity_), total_client_conns_cnt_(0), total_server_conns_cnt_(0), rng_(GetRandomSeed32()) { if (bld.metric_entity_) { invoke_us_histogram_ = METRIC_reactor_active_latency_us.Instantiate(bld.metric_entity_); load_percent_histogram_ = METRIC_reactor_load_percent.Instantiate(bld.metric_entity_); } } Status ReactorThread::Init() { DCHECK(thread_.get() == nullptr) << "Already started"; DVLOG(6) << "Called ReactorThread::Init()"; // Register to get async notifications in our epoll loop. async_.set(loop_); async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this); // NOLINT(*) async_.start(); // Register the timer watcher. // The timer is used for closing old TCP connections and applying // backpressure. timer_.set(loop_); timer_.set<ReactorThread, &ReactorThread::TimerHandler>(this); // NOLINT(*) timer_.start(coarse_timer_granularity_.ToSeconds(), coarse_timer_granularity_.ToSeconds()); // Register our callbacks. ev++ doesn't provide handy wrappers for these. ev_set_userdata(loop_, this); ev_set_loop_release_cb(loop_, &ReactorThread::AboutToPollCb, &ReactorThread::PollCompleteCb); ev_set_invoke_pending_cb(loop_, &ReactorThread::InvokePendingCb); // Create Reactor thread. return kudu::Thread::Create("reactor", "rpc reactor", [this]() { this->RunThread(); }, &thread_); } void ReactorThread::InvokePendingCb(struct ev_loop* loop) { // Calculate the number of cycles spent calling our callbacks. // This is called quite frequently so we use CycleClock rather than MonoTime // since it's a bit faster. int64_t start = CycleClock::Now(); ev_invoke_pending(loop); int64_t dur_cycles = CycleClock::Now() - start; // Contribute this to our histogram. ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop)); if (thr->invoke_us_histogram_) { thr->invoke_us_histogram_->Increment(dur_cycles * 1000000 / base::CyclesPerSecond()); } } void ReactorThread::AboutToPollCb(struct ev_loop* loop) noexcept { // Store the current time in a member variable to be picked up below // in PollCompleteCb. ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop)); thr->cycle_clock_before_poll_ = CycleClock::Now(); } void ReactorThread::PollCompleteCb(struct ev_loop* loop) noexcept { // First things first, capture the time, so that this is as accurate as possible int64_t cycle_clock_after_poll = CycleClock::Now(); // Record it in our accounting. ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop)); DCHECK_NE(thr->cycle_clock_before_poll_, -1) << "PollCompleteCb called without corresponding AboutToPollCb"; int64_t poll_cycles = cycle_clock_after_poll - thr->cycle_clock_before_poll_; thr->cycle_clock_before_poll_ = -1; thr->total_poll_cycles_ += poll_cycles; } void ReactorThread::Shutdown(Messenger::ShutdownMode mode) { CHECK(reactor_->closing()) << "Should be called after setting closing_ flag"; VLOG(1) << name() << ": shutting down Reactor thread."; WakeThread(); if (mode == Messenger::ShutdownMode::SYNC) { // Join() will return a bad status if asked to join on the currently // running thread. CHECK_OK(ThreadJoiner(thread_.get()).Join()); } } void ReactorThread::ShutdownInternal() { DCHECK(IsCurrentThread()); // Tear down any outbound TCP connections. Status service_unavailable = ShutdownError(false); VLOG(1) << name() << ": tearing down outbound TCP connections..."; for (const auto& elem : client_conns_) { const auto& conn = elem.second; VLOG(1) << name() << ": shutting down " << conn->ToString(); conn->Shutdown(service_unavailable); } client_conns_.clear(); // Tear down any inbound TCP connections. VLOG(1) << name() << ": tearing down inbound TCP connections..."; for (const auto& conn : server_conns_) { VLOG(1) << name() << ": shutting down " << conn->ToString(); conn->Shutdown(service_unavailable); } server_conns_.clear(); // Abort any scheduled tasks. // // These won't be found in the ReactorThread's list of pending tasks // because they've been "run" (that is, they've been scheduled). Status aborted = ShutdownError(true); // aborted while (!scheduled_tasks_.empty()) { DelayedTask* t = &scheduled_tasks_.front(); scheduled_tasks_.pop_front(); t->Abort(aborted); // should also free the task. } // Remove the OpenSSL thread state. // // As of OpenSSL 1.1, this [1] is a no-op and can be ignored. // // 1. https://www.openssl.org/docs/man1.1.0/crypto/ERR_remove_thread_state.html #if OPENSSL_VERSION_NUMBER < 0x10100000L ERR_remove_thread_state(nullptr); #endif } ReactorTask::ReactorTask() { } ReactorTask::~ReactorTask() { } Status ReactorThread::GetMetrics(ReactorMetrics* metrics) { DCHECK(IsCurrentThread()); metrics->num_client_connections_ = client_conns_.size(); metrics->num_server_connections_ = server_conns_.size(); metrics->total_client_connections_ = total_client_conns_cnt_; metrics->total_server_connections_ = total_server_conns_cnt_; return Status::OK(); } Status ReactorThread::DumpConnections(const DumpConnectionsRequestPB& req, DumpConnectionsResponsePB* resp) { DCHECK(IsCurrentThread()); for (const scoped_refptr<Connection>& conn : server_conns_) { RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections())); } for (const conn_multimap_t::value_type& entry : client_conns_) { Connection* conn = entry.second.get(); RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections())); } return Status::OK(); } void ReactorThread::WakeThread() { // libev uses some lock-free synchronization, but doesn't have TSAN annotations. // See http://lists.schmorp.de/pipermail/libev/2013q2/002178.html or KUDU-366 // for examples. debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; async_.send(); } // Handle async events. These events are sent to the reactor by other // threads that want to bring something to our attention, like the fact that // we're shutting down, or the fact that there is a new outbound Transfer // ready to send. void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) { DCHECK(IsCurrentThread()); if (PREDICT_FALSE(reactor_->closing())) { ShutdownInternal(); loop_.break_loop(); // break the epoll loop and terminate the thread return; } boost::intrusive::list<ReactorTask> tasks; reactor_->DrainTaskQueue(&tasks); while (!tasks.empty()) { ReactorTask& task = tasks.front(); tasks.pop_front(); task.Run(this); } } void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) { DCHECK(IsCurrentThread()); Status s = StartConnectionNegotiation(conn); if (PREDICT_FALSE(!s.ok())) { LOG(ERROR) << "Server connection negotiation failed: " << s.ToString(); DestroyConnection(conn.get(), s); return; } ++total_server_conns_cnt_; server_conns_.emplace_back(std::move(conn)); } void ReactorThread::AssignOutboundCall(shared_ptr<OutboundCall> call) { DCHECK(IsCurrentThread()); // Skip if the outbound has been cancelled already. if (PREDICT_FALSE(call->IsCancelled())) { return; } scoped_refptr<Connection> conn; Status s = FindOrStartConnection(call->conn_id(), call->controller()->credentials_policy(), &conn); if (PREDICT_FALSE(!s.ok())) { call->SetFailed(std::move(s), OutboundCall::Phase::CONNECTION_NEGOTIATION); return; } conn->QueueOutboundCall(std::move(call)); } void ReactorThread::CancelOutboundCall(const shared_ptr<OutboundCall>& call) { DCHECK(IsCurrentThread()); // If the callback has been invoked already, the cancellation is a no-op. // The controller may be gone already if the callback has been invoked. if (call->IsFinished()) { return; } scoped_refptr<Connection> conn; if (FindConnection(call->conn_id(), call->controller()->credentials_policy(), &conn)) { conn->CancelOutboundCall(call); } call->Cancel(); } // // Handles timer events. The periodic timer: // // 1. updates Reactor::cur_time_ // 2. every tcp_conn_timeo_ seconds, close down connections older than // tcp_conn_timeo_ seconds. // void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) { DCHECK(IsCurrentThread()); if (EV_ERROR & revents) { LOG(WARNING) << "Reactor " << name() << " got an error in " "the timer handler."; return; } cur_time_ = MonoTime::Now(); // Compute load percentage. int64_t now_cycles = CycleClock::Now(); if (last_load_measurement_.time_cycles != -1) { int64_t cycles_delta = (now_cycles - last_load_measurement_.time_cycles); int64_t poll_cycles_delta = total_poll_cycles_ - last_load_measurement_.poll_cycles; double poll_fraction = static_cast<double>(poll_cycles_delta) / cycles_delta; double active_fraction = 1 - poll_fraction; if (load_percent_histogram_) { load_percent_histogram_->Increment(static_cast<int>(active_fraction * 100)); } } last_load_measurement_.time_cycles = now_cycles; last_load_measurement_.poll_cycles = total_poll_cycles_; ScanIdleConnections(); } void ReactorThread::RegisterTimeout(ev::timer* watcher) { watcher->set(loop_); } void ReactorThread::ScanIdleConnections() { DCHECK(IsCurrentThread()); // Enforce TCP connection timeouts: server-side connections. const auto server_conns_end = server_conns_.end(); uint64_t timed_out = 0; // Scan for idle server connections if it's enabled. if (connection_keepalive_time_ >= MonoDelta::FromMilliseconds(0)) { for (auto it = server_conns_.begin(); it != server_conns_end; ) { Connection* conn = it->get(); if (!conn->Idle()) { VLOG(10) << "Connection " << conn->ToString() << " not idle"; ++it; continue; } const MonoDelta connection_delta(cur_time_ - conn->last_activity_time()); if (connection_delta <= connection_keepalive_time_) { ++it; continue; } conn->Shutdown(Status::NetworkError( Substitute("connection timed out after $0", connection_keepalive_time_.ToString()))); VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for " << connection_delta.ToString(); ++timed_out; it = server_conns_.erase(it); } } // Take care of idle client-side connections marked for shutdown. uint64_t shutdown = 0; for (auto it = client_conns_.begin(); it != client_conns_.end();) { Connection* conn = it->second.get(); if (conn->scheduled_for_shutdown() && conn->Idle()) { conn->Shutdown(Status::NetworkError( "connection has been marked for shutdown")); it = client_conns_.erase(it); ++shutdown; } else { ++it; } } // TODO(aserbin): clients may want to set their keepalive timeout for idle // but not scheduled for shutdown connections. VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP connections."; VLOG_IF(1, shutdown > 0) << name() << ": shutdown " << shutdown << " TCP connections."; } const string& ReactorThread::name() const { return reactor_->name(); } MonoTime ReactorThread::cur_time() const { return cur_time_; } Reactor* ReactorThread::reactor() { return reactor_; } bool ReactorThread::IsCurrentThread() const { return thread_.get() == kudu::Thread::current_thread(); } void ReactorThread::RunThread() { ThreadRestrictions::SetWaitAllowed(false); ThreadRestrictions::SetIOAllowed(false); DVLOG(6) << "Calling ReactorThread::RunThread()..."; loop_.run(0); VLOG(1) << name() << " thread exiting."; // No longer need the messenger. This causes the messenger to // get deleted when all the reactors exit. reactor_->messenger_.reset(); } bool ReactorThread::FindConnection(const ConnectionId& conn_id, CredentialsPolicy cred_policy, scoped_refptr<Connection>* conn) { DCHECK(IsCurrentThread()); const auto range = client_conns_.equal_range(conn_id); scoped_refptr<Connection> found_conn; for (auto it = range.first; it != range.second;) { const auto& c = it->second.get(); // * Do not use connections scheduled for shutdown to place new calls. // // * Do not use a connection with a non-compliant credentials policy. // Instead, open a new one, while marking the former as scheduled for // shutdown. This process converges: any connection that satisfies the // PRIMARY_CREDENTIALS policy automatically satisfies the ANY_CREDENTIALS // policy as well. The idea is to keep only one usable connection // identified by the specified 'conn_id'. // // * If the test-only 'one-connection-per-RPC' mode is enabled, connections // are re-established at every RPC call. if (c->scheduled_for_shutdown() || !c->SatisfiesCredentialsPolicy(cred_policy) || PREDICT_FALSE(FLAGS_rpc_reopen_outbound_connections)) { if (c->Idle()) { // Shutdown idle connections to the target destination. Non-idle ones // will be taken care of later by the idle connection scanner. DCHECK_EQ(Connection::CLIENT, c->direction()); c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy")); it = client_conns_.erase(it); continue; } c->set_scheduled_for_shutdown(); } else { DCHECK(!found_conn); found_conn = c; // Appropriate connection is found; continue further to take care of the // rest of connections to mark them for shutdown if they are not // satisfying the policy. } ++it; } if (found_conn) { // Found matching not-to-be-shutdown connection: return it as the result. conn->swap(found_conn); return true; } return false; } Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id, CredentialsPolicy cred_policy, scoped_refptr<Connection>* conn) { DCHECK(IsCurrentThread()); if (FindConnection(conn_id, cred_policy, conn)) { return Status::OK(); } // No connection to this remote. Need to create one. VLOG(2) << name() << " FindOrStartConnection: creating " << "new connection for " << conn_id.remote().ToString(); // Create a new socket and start connecting to the remote. Socket sock; RETURN_NOT_OK(CreateClientSocket(conn_id.remote().family(), &sock)); RETURN_NOT_OK(StartConnect(&sock, conn_id.remote())); unique_ptr<Socket> new_socket(new Socket(sock.Release())); // Register the new connection in our map. *conn = new Connection( this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy); (*conn)->set_outbound_connection_id(conn_id); // Kick off blocking client connection negotiation. Status s = StartConnectionNegotiation(*conn); if (s.IsIllegalState()) { // Return a nicer error message to the user indicating -- if we just // forward the status we'd get something generic like "ThreadPool is closing". return Status::ServiceUnavailable("Client RPC Messenger shutting down"); } // Propagate any other errors as-is. RETURN_NOT_OK_PREPEND(s, "Unable to start connection negotiation thread"); // Insert into the client connection map to avoid duplicate connection requests. client_conns_.emplace(conn_id, *conn); ++total_client_conns_cnt_; return Status::OK(); } Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>& conn) { DCHECK(IsCurrentThread()); // Set a limit on how long the server will negotiate with a new client. MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(reactor()->messenger()->rpc_negotiation_timeout_ms()); scoped_refptr<Trace> trace(new Trace()); ADOPT_TRACE(trace.get()); TRACE("Submitting negotiation task for $0", conn->ToString()); auto authentication = reactor()->messenger()->authentication(); auto encryption = reactor()->messenger()->encryption(); auto loopback_encryption = reactor()->messenger()->loopback_encryption(); ThreadPool* negotiation_pool = reactor()->messenger()->negotiation_pool(conn->direction()); RETURN_NOT_OK(negotiation_pool->Submit([conn, authentication, encryption, loopback_encryption, deadline]() { Negotiation::RunNegotiation(conn, authentication, encryption, loopback_encryption, deadline); })); return Status::OK(); } void ReactorThread::CompleteConnectionNegotiation( const scoped_refptr<Connection>& conn, const Status& status, unique_ptr<ErrorStatusPB> rpc_error) { DCHECK(IsCurrentThread()); if (PREDICT_FALSE(!status.ok())) { DestroyConnection(conn.get(), status, std::move(rpc_error)); return; } // Switch the socket back to non-blocking mode after negotiation. Status s = conn->SetNonBlocking(true); if (PREDICT_FALSE(!s.ok())) { LOG(DFATAL) << "Unable to set connection to non-blocking mode: " << s.ToString(); DestroyConnection(conn.get(), s, std::move(rpc_error)); return; } if (conn->remote().is_ip() && FLAGS_tcp_keepalive_probe_period_s > 0) { // Try spreading out the idle poll period to avoid thundering herd in case connections // are all created at the same time (e.g. after a cluster is restarted). Status keepalive_status = conn->SetTcpKeepAlive( FLAGS_tcp_keepalive_probe_period_s + rng_.Uniform32(4), FLAGS_tcp_keepalive_retry_period_s, FLAGS_tcp_keepalive_retry_count); if (PREDICT_FALSE(!keepalive_status.ok())) { LOG(DFATAL) << "Unable to set TCP keepalive for connection: " << keepalive_status.ToString(); DestroyConnection(conn.get(), keepalive_status, std::move(rpc_error)); return; } } conn->MarkNegotiationComplete(); conn->EpollRegister(loop_); } Status ReactorThread::CreateClientSocket(int family, Socket* sock) { Status ret = sock->Init(family, Socket::FLAG_NONBLOCKING); if (ret.ok() && family == AF_INET) { ret = sock->SetNoDelay(true); } LOG_IF(WARNING, !ret.ok()) << "failed to create an outbound connection because a new socket could not be created: " << ret.ToString(); return ret; } Status ReactorThread::StartConnect(Socket* sock, const Sockaddr& remote) { const Status ret = sock->Connect(remote); if (ret.ok()) { VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString(); return Status::OK(); } int posix_code = ret.posix_code(); if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) { VLOG(3) << "StartConnect: connect in progress for " << remote.ToString(); return Status::OK(); } LOG(WARNING) << "Failed to create an outbound connection to " << remote.ToString() << " because connect() failed: " << ret.ToString(); return ret; } void ReactorThread::DestroyConnection(Connection* conn, const Status& conn_status, unique_ptr<ErrorStatusPB> rpc_error) { DCHECK(IsCurrentThread()); conn->Shutdown(conn_status, std::move(rpc_error)); // Unlink connection from lists. if (conn->direction() == Connection::CLIENT) { const auto range = client_conns_.equal_range(conn->outbound_connection_id()); CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString(); // The client_conns_ container is a multi-map. for (auto it = range.first; it != range.second;) { if (it->second.get() == conn) { it = client_conns_.erase(it); break; } ++it; } } else if (conn->direction() == Connection::SERVER) { auto it = server_conns_.begin(); while (it != server_conns_.end()) { if ((*it).get() == conn) { server_conns_.erase(it); break; } ++it; } } } DelayedTask::DelayedTask(std::function<void(const Status&)> func, MonoDelta when) : func_(std::move(func)), when_(when), thread_(nullptr) { } void DelayedTask::Run(ReactorThread* thread) { DCHECK(thread_ == nullptr) << "Task has already been scheduled"; DCHECK(thread->IsCurrentThread()); DCHECK(!is_linked()) << "Should not be linked on pending_tasks_ anymore"; // Schedule the task to run later. thread_ = thread; timer_.set(thread->loop_); timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this); // NOLINT(*) timer_.start(when_.ToSeconds(), // after 0); // repeat thread_->scheduled_tasks_.push_back(*this); } void DelayedTask::Abort(const Status& abort_status) { func_(abort_status); delete this; } void DelayedTask::TimerHandler(ev::timer& /*watcher*/, int revents) { DCHECK(is_linked()) << "should be linked on scheduled_tasks_"; // We will free this task's memory. thread_->scheduled_tasks_.erase(thread_->scheduled_tasks_.iterator_to(*this)); if (EV_ERROR & revents) { string msg = "Delayed task got an error in its timer handler"; LOG(WARNING) << msg; Abort(Status::Aborted(msg)); // Will delete 'this'. } else { func_(Status::OK()); delete this; } } Reactor::Reactor(shared_ptr<Messenger> messenger, int index, const MessengerBuilder& bld) : messenger_(std::move(messenger)), name_(StringPrintf("%s_R%03d", messenger_->name().c_str(), index)), closing_(false), thread_(this, bld) { static std::once_flag libev_once; std::call_once(libev_once, DoInitLibEv); } Status Reactor::Init() { DVLOG(6) << "Called Reactor::Init()"; return thread_.Init(); } void Reactor::Shutdown(Messenger::ShutdownMode mode) { { std::lock_guard<LockType> l(lock_); if (closing_) { return; } closing_ = true; } thread_.Shutdown(mode); // Abort all pending tasks. No new tasks can get scheduled after this // because ScheduleReactorTask() tests the closing_ flag set above. Status aborted = ShutdownError(true); while (!pending_tasks_.empty()) { ReactorTask& task = pending_tasks_.front(); pending_tasks_.pop_front(); task.Abort(aborted); } } Reactor::~Reactor() { Shutdown(Messenger::ShutdownMode::ASYNC); } const string& Reactor::name() const { return name_; } bool Reactor::closing() const { std::lock_guard<LockType> l(lock_); return closing_; } // Task to call an arbitrary function within the reactor thread. class RunFunctionTask : public ReactorTask { public: explicit RunFunctionTask(std::function<Status()> f) : function_(std::move(f)), latch_(1) {} void Run(ReactorThread* /*reactor*/) override { status_ = function_(); latch_.CountDown(); } void Abort(const Status& status) override { status_ = status; latch_.CountDown(); } // Wait until the function has completed, and return the Status // returned by the function. Status Wait() { latch_.Wait(); return status_; } private: const std::function<Status()> function_; Status status_; CountDownLatch latch_; }; Status Reactor::GetMetrics(ReactorMetrics* metrics) { return RunOnReactorThread([&]() { return this->thread_.GetMetrics(metrics); }); } Status Reactor::RunOnReactorThread(std::function<Status()> f) { RunFunctionTask task(std::move(f)); ScheduleReactorTask(&task); return task.Wait(); } Status Reactor::DumpConnections(const DumpConnectionsRequestPB& req, DumpConnectionsResponsePB* resp) { return RunOnReactorThread([&]() { return this->thread_.DumpConnections(req, resp); }); } class RegisterConnectionTask : public ReactorTask { public: explicit RegisterConnectionTask(scoped_refptr<Connection> conn) : conn_(std::move(conn)) { } void Run(ReactorThread* reactor) override { reactor->RegisterConnection(std::move(conn_)); delete this; } void Abort(const Status& /*status*/) override { // We don't need to Shutdown the connection since it was never registered. // This is only used for inbound connections, and inbound connections will // never have any calls added to them until they've been registered. delete this; } private: const scoped_refptr<Connection> conn_; }; void Reactor::RegisterInboundSocket(Socket* socket, const Sockaddr& remote) { VLOG(3) << name_ << ": new inbound connection to " << remote.ToString(); unique_ptr<Socket> new_socket(new Socket(socket->Release())); auto task = new RegisterConnectionTask( new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER)); ScheduleReactorTask(task); } // Task which runs in the reactor thread to assign an outbound call // to a connection. class AssignOutboundCallTask : public ReactorTask { public: explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call) : call_(std::move(call)) {} void Run(ReactorThread* reactor) override { reactor->AssignOutboundCall(std::move(call_)); delete this; } void Abort(const Status& status) override { // It doesn't matter what is the actual phase of the OutboundCall: just set // it to Phase::REMOTE_CALL to finalize the state of the call. call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL); delete this; } private: const shared_ptr<OutboundCall> call_; }; void Reactor::QueueOutboundCall(shared_ptr<OutboundCall> call) { DVLOG(3) << name_ << ": queueing outbound call " << call->ToString() << " to remote " << call->conn_id().remote().ToString(); // Test cancellation when 'call_' is in 'READY' state. if (PREDICT_FALSE(call->ShouldInjectCancellation())) { QueueCancellation(call); } ScheduleReactorTask(new AssignOutboundCallTask(std::move(call))); } class CancellationTask : public ReactorTask { public: explicit CancellationTask(shared_ptr<OutboundCall> call) : call_(std::move(call)) {} void Run(ReactorThread* reactor) override { reactor->CancelOutboundCall(call_); delete this; } void Abort(const Status& /*status*/) override { delete this; } private: const shared_ptr<OutboundCall> call_; }; void Reactor::QueueCancellation(shared_ptr<OutboundCall> call) { ScheduleReactorTask(new CancellationTask(std::move(call))); } void Reactor::ScheduleReactorTask(ReactorTask* task) { bool was_empty; { std::unique_lock<LockType> l(lock_); if (closing_) { // We guarantee the reactor lock is not taken when calling Abort(). l.unlock(); task->Abort(ShutdownError(false)); return; } was_empty = pending_tasks_.empty(); pending_tasks_.push_back(*task); } if (was_empty) { thread_.WakeThread(); } } bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask>* tasks) { // NOLINT(*) std::lock_guard<LockType> l(lock_); if (closing_) { return false; } tasks->swap(pending_tasks_); return true; } } // namespace rpc } // namespace kudu