thrift/lib/cpp2/server/ThriftServer.cpp (1,120 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <thrift/lib/cpp2/server/ThriftServer.h> #include <fcntl.h> #include <signal.h> #include <iostream> #include <random> #include <glog/logging.h> #include <folly/Conv.h> #include <folly/Memory.h> #include <folly/ScopeGuard.h> #include <folly/executors/CPUThreadPoolExecutor.h> #include <folly/executors/IOThreadPoolDeadlockDetectorObserver.h> #include <folly/executors/thread_factory/NamedThreadFactory.h> #include <folly/executors/thread_factory/PriorityThreadFactory.h> #include <folly/experimental/coro/BlockingWait.h> #include <folly/experimental/coro/CurrentExecutor.h> #include <folly/experimental/coro/Invoke.h> #include <folly/io/GlobalShutdownSocketSet.h> #include <folly/portability/Sockets.h> #include <folly/system/Pid.h> #include <thrift/lib/cpp/concurrency/PosixThreadFactory.h> #include <thrift/lib/cpp/concurrency/Thread.h> #include <thrift/lib/cpp/concurrency/ThreadManager.h> #include <thrift/lib/cpp/server/TServerObserver.h> #include <thrift/lib/cpp2/Flags.h> #include <thrift/lib/cpp2/async/MultiplexAsyncProcessor.h> #include <thrift/lib/cpp2/server/Cpp2Connection.h> #include <thrift/lib/cpp2/server/Cpp2Worker.h> #include <thrift/lib/cpp2/server/LoggingEvent.h> #include <thrift/lib/cpp2/server/ParallelConcurrencyController.h> #include <thrift/lib/cpp2/server/RoundRobinRequestPile.h> #include <thrift/lib/cpp2/server/ServerFlags.h> #include <thrift/lib/cpp2/server/ServerInstrumentation.h> #include <thrift/lib/cpp2/server/ThriftProcessor.h> #include <thrift/lib/cpp2/transport/core/ManagedConnectionIf.h> #include <thrift/lib/cpp2/transport/rocket/server/RocketRoutingHandler.h> #include <thrift/lib/cpp2/transport/rocket/server/RocketServerConnection.h> #include <wangle/acceptor/FizzConfigUtil.h> #include <wangle/acceptor/SharedSSLContextManager.h> using namespace std::literals::chrono_literals; DEFINE_bool( thrift_abort_if_exceeds_shutdown_deadline, true, "Abort the server if failed to drain active requests within deadline"); DEFINE_string( thrift_ssl_policy, "disabled", "SSL required / permitted / disabled"); DEFINE_string( service_identity, "", "The name of the service. Associates the service with ACLs and keys"); THRIFT_FLAG_DEFINE_bool(server_alpn_prefer_rocket, true); THRIFT_FLAG_DEFINE_bool(server_enable_stoptls, false); THRIFT_FLAG_DEFINE_bool(ssl_policy_default_required, true); THRIFT_FLAG_DEFINE_bool(alpn_allow_mismatch, true); THRIFT_FLAG_DEFINE_bool(dump_snapshot_on_long_shutdown, true); THRIFT_FLAG_DEFINE_bool(server_check_unimplemented_extra_interfaces, true); THRIFT_FLAG_DEFINE_bool(enable_on_stop_serving, true); THRIFT_FLAG_DEFINE_bool(enable_io_queue_lag_detection, true); namespace apache::thrift::detail { THRIFT_PLUGGABLE_FUNC_REGISTER( apache::thrift::ThriftServer::DumpSnapshotOnLongShutdownResult, dumpSnapshotOnLongShutdown) { return {folly::makeSemiFuture(folly::unit), 0ms}; } THRIFT_PLUGGABLE_FUNC_REGISTER( apache::thrift::ThriftServer::ExtraInterfaces, createDefaultExtraInterfaces) { return { nullptr /* monitoring */, nullptr /* status */, nullptr /* control */}; } THRIFT_PLUGGABLE_FUNC_REGISTER( ThriftServer::UnimplementedExtraInterfacesResult, serviceHasUnimplementedExtraInterfaces, AsyncProcessorFactory& /* service */) { return ThriftServer::UnimplementedExtraInterfacesResult::UNRECOGNIZED; } } // namespace apache::thrift::detail namespace { [[noreturn]] void try_quick_exit(int code) { #if defined(_GLIBCXX_HAVE_AT_QUICK_EXIT) std::quick_exit(code); #else std::exit(code); #endif } } // namespace namespace apache { namespace thrift { using namespace apache::thrift::server; using namespace std; using apache::thrift::concurrency::PriorityThreadManager; using apache::thrift::concurrency::Runnable; using apache::thrift::concurrency::ThreadManager; using wangle::TLSCredProcessor; namespace { folly::Synchronized<std::vector<ThriftServer::IOObserverFactory>> ioObserverFactories{}; /** * Multiplexes the user-service (set via setProcessorFactory) with the * monitoring interface (set via setMonitoringInterface). */ std::unique_ptr<AsyncProcessorFactory> createDecoratedProcessorFactory( std::shared_ptr<AsyncProcessorFactory> processorFactory, std::shared_ptr<StatusServerInterface> statusProcessorFactory, std::shared_ptr<MonitoringServerInterface> monitoringProcessorFactory, std::shared_ptr<ControlServerInterface> controlProcessorFactory, bool shouldCheckForUnimplementedExtraInterfaces) { std::vector<std::shared_ptr<AsyncProcessorFactory>> servicesToMultiplex; CHECK(processorFactory != nullptr); if (statusProcessorFactory != nullptr) { servicesToMultiplex.emplace_back(std::move(statusProcessorFactory)); } if (monitoringProcessorFactory != nullptr) { servicesToMultiplex.emplace_back(std::move(monitoringProcessorFactory)); } if (controlProcessorFactory != nullptr) { servicesToMultiplex.emplace_back(std::move(controlProcessorFactory)); } const bool shouldPlaceExtraInterfacesInFront = shouldCheckForUnimplementedExtraInterfaces && apache::thrift::detail::serviceHasUnimplementedExtraInterfaces( *processorFactory) == ThriftServer::UnimplementedExtraInterfacesResult::UNIMPLEMENTED; auto userServicePosition = shouldPlaceExtraInterfacesInFront ? servicesToMultiplex.end() : servicesToMultiplex.begin(); servicesToMultiplex.insert(userServicePosition, std::move(processorFactory)); return std::make_unique<MultiplexAsyncProcessorFactory>( std::move(servicesToMultiplex)); } } // namespace // HACK: To avoid circular header includes, we define these in ThriftServer.h // instead of AsyncProcessor.h #if FOLLY_HAS_COROUTINES folly::coro::CancellableAsyncScope* ServiceHandlerBase::getAsyncScope() { return server_->getAsyncScope(); } #endif void ServiceHandlerBase::attachServer(ThriftServer& server) { server_ = &server; serverStopController_.lock()->emplace(server.getStopController()); } void ServiceHandlerBase::detachServer() { server_ = nullptr; serverStopController_.lock()->reset(); } void ServiceHandlerBase::shutdownServer() { // shutdownServer should be idempotent -- this means that it can race with // detachServer. Thus we should sychronize access to it. serverStopController_.withLock([](auto& stopController) { if (!stopController.has_value()) { return; } if (auto lockedPtr = stopController->lock()) { lockedPtr->stop(); } }); } TLSCredentialWatcher::TLSCredentialWatcher(ThriftServer* server) : credProcessor_() { credProcessor_.addCertCallback([server] { server->updateTLSCert(); }); credProcessor_.addTicketCallback([server](wangle::TLSTicketKeySeeds seeds) { server->updateTicketSeeds(std::move(seeds)); }); } ThriftServer::ThriftServer() : BaseThriftServer(), wShutdownSocketSet_(folly::tryGetShutdownSocketSet()), lastRequestTime_( std::chrono::steady_clock::now().time_since_epoch().count()) { if (FLAGS_thrift_ssl_policy == "required") { sslPolicy_ = SSLPolicy::REQUIRED; } else if (FLAGS_thrift_ssl_policy == "permitted") { sslPolicy_ = SSLPolicy::PERMITTED; } metadata().wrapper = "ThriftServer-cpp"; auto extraInterfaces = apache::thrift::detail::createDefaultExtraInterfaces(); setMonitoringInterface(std::move(extraInterfaces.monitoring)); setStatusInterface(std::move(extraInterfaces.status)); setControlInterface(std::move(extraInterfaces.control)); } ThriftServer::ThriftServer( const std::shared_ptr<HeaderServerChannel>& serverChannel) : ThriftServer() { serverChannel_ = serverChannel; setNumIOWorkerThreads(1); setIdleTimeout(std::chrono::milliseconds(0)); } ThriftServer::~ThriftServer() { tracker_.reset(); if (duplexWorker_) { // usually ServerBootstrap::stop drains the workers, but ServerBootstrap // doesn't know about duplexWorker_ duplexWorker_->drainAllConnections(); LOG_IF(ERROR, !duplexWorker_.unique()) << getActiveRequests() << " active Requests while in destructing" << " duplex ThriftServer. Consider using startDuplex & stopDuplex"; } SCOPE_EXIT { stopController_.join(); }; if (stopWorkersOnStopListening_) { // Everything is already taken care of. return; } // If the flag is false, neither i/o nor CPU workers are stopped at this // point. Stop them now. if (!joinRequestsWhenServerStops_) { stopAcceptingAndJoinOutstandingRequests(); } stopCPUWorkers(); stopWorkers(); } SSLPolicy ThriftServer::getSSLPolicy() const { // If it's explicitly set in constructor through gflags or through the // setSSLPolicy setter, then use that value. if (sslPolicy_.has_value()) { return *sslPolicy_; } // Otherwise, fallback to default (currently defined through a ThriftFlag). // REQUIRED is the new default we're migrating to. We use ThriftFlags to // opt-out services that still need to use PERMITTED. return THRIFT_FLAG(ssl_policy_default_required) ? SSLPolicy::REQUIRED : SSLPolicy::PERMITTED; } void ThriftServer::setProcessorFactory( std::shared_ptr<AsyncProcessorFactory> pFac) { CHECK(configMutable()); BaseThriftServer::setProcessorFactory(pFac); thriftProcessor_.reset(new ThriftProcessor(*this)); } std::unique_ptr<AsyncProcessor> ThriftServer::getDecoratedProcessorWithoutEventHandlers() const { return static_cast<MultiplexAsyncProcessorFactory&>( getDecoratedProcessorFactory()) .getProcessorWithUnderlyingModifications( [](AsyncProcessor& processor) { processor.clearEventHandlers(); }); } void ThriftServer::useExistingSocket( folly::AsyncServerSocket::UniquePtr socket) { socket_ = std::move(socket); } void ThriftServer::useExistingSockets(const std::vector<int>& socketFds) { folly::AsyncServerSocket::UniquePtr socket(new folly::AsyncServerSocket); std::vector<folly::NetworkSocket> sockets; sockets.reserve(socketFds.size()); for (auto s : socketFds) { sockets.push_back(folly::NetworkSocket::fromFd(s)); } socket->useExistingSockets(sockets); useExistingSocket(std::move(socket)); } void ThriftServer::useExistingSocket(int socket) { useExistingSockets({socket}); } std::vector<int> ThriftServer::getListenSockets() const { std::vector<int> sockets; for (const auto& socket : getSockets()) { auto newsockets = socket->getNetworkSockets(); sockets.reserve(sockets.size() + newsockets.size()); for (auto s : newsockets) { sockets.push_back(s.toFd()); } } return sockets; } int ThriftServer::getListenSocket() const { std::vector<int> sockets = getListenSockets(); if (sockets.size() == 0) { return -1; } CHECK(sockets.size() == 1); return sockets[0]; } folly::EventBaseManager* ThriftServer::getEventBaseManager() { return eventBaseManager_; } ThriftServer::IdleServerAction::IdleServerAction( ThriftServer& server, folly::HHWheelTimer& timer, std::chrono::milliseconds timeout) : server_(server), timer_(timer), timeout_(timeout) { timer_.scheduleTimeout(this, timeout_); } void ThriftServer::IdleServerAction::timeoutExpired() noexcept { try { auto const lastRequestTime = server_.lastRequestTime(); auto const elapsed = std::chrono::steady_clock::now() - lastRequestTime; if (elapsed >= timeout_) { LOG(INFO) << "shutting down server due to inactivity after " << std::chrono::duration_cast<std::chrono::milliseconds>( elapsed) .count() << "ms"; server_.stop(); return; } timer_.scheduleTimeout(this, timeout_); } catch (std::exception const& e) { LOG(ERROR) << e.what(); } } std::chrono::steady_clock::time_point ThriftServer::lastRequestTime() const noexcept { return std::chrono::steady_clock::time_point( std::chrono::steady_clock::duration( lastRequestTime_.load(std::memory_order_relaxed))); } void ThriftServer::touchRequestTimestamp() noexcept { if (idleServer_.has_value()) { lastRequestTime_.store( std::chrono::steady_clock::now().time_since_epoch().count(), std::memory_order_relaxed); } } void ThriftServer::setup() { ensureDecoratedProcessorFactoryInitialized(); auto nWorkers = getNumIOWorkerThreads(); DCHECK_GT(nWorkers, 0u); addRoutingHandler( std::make_unique<apache::thrift::RocketRoutingHandler>(*this)); // Initialize event base for this thread auto serveEventBase = eventBaseManager_->getEventBase(); serveEventBase_ = serveEventBase; stopController_.set(std::make_unique<StopController>( folly::badge<ThriftServer>{}, *serveEventBase)); if (idleServerTimeout_.count() > 0) { idleServer_.emplace(*this, serveEventBase->timer(), idleServerTimeout_); } // Print some libevent stats VLOG(1) << "libevent " << folly::EventBase::getLibeventVersion() << " method " << serveEventBase->getLibeventMethod(); try { #ifndef _WIN32 // OpenSSL might try to write to a closed socket if the peer disconnects // abruptly, raising a SIGPIPE signal. By default this will terminate the // process, which we don't want. Hence we need to handle SIGPIPE specially. // // We don't use SIG_IGN here as child processes will inherit that handler. // Instead, we swallow the signal to enable SIGPIPE in children to behave // normally. // Furthermore, setting flags to 0 and using sigaction prevents SA_RESTART // from restarting syscalls after the handler completed. This is important // for code using SIGPIPE to interrupt syscalls in other threads. struct sigaction sa = {}; sa.sa_handler = [](int) {}; sa.sa_flags = 0; sigemptyset(&sa.sa_mask); sigaction(SIGPIPE, &sa, nullptr); #endif if (!getObserver() && server::observerFactory_) { setObserver(server::observerFactory_->getObserver()); } if (!useResourcePoolsFlagsSet()) { // We always need a threadmanager for cpp2. LOG(INFO) << "Using thread manager (resource pools not enabled)"; setupThreadManager(); threadManager_->setExpireCallback([&](std::shared_ptr<Runnable> r) { EventTask* task = dynamic_cast<EventTask*>(r.get()); if (task) { task->expired(); } }); threadManager_->setCodelCallback([&](std::shared_ptr<Runnable>) { auto observer = getObserver(); if (observer) { if (getEnableCodel()) { observer->queueTimeout(); } else { observer->shadowQueueTimeout(); } } }); } else { LOG(INFO) << "Using resource pools"; DCHECK(!threadManager_); ensureResourcePools(); // Keep concurrency controller in sync with max requests for now. resourcePoolSet() .resourcePool(ResourcePoolHandle::defaultAsync()) .concurrencyController() .value() ->setExecutionLimitRequests(getMaxRequests()); } if (!serverChannel_) { ServerBootstrap::socketConfig.acceptBacklog = getListenBacklog(); ServerBootstrap::socketConfig.maxNumPendingConnectionsPerWorker = getMaxNumPendingConnectionsPerWorker(); if (reusePort_.value_or(false)) { ServerBootstrap::setReusePort(true); } if (enableTFO_) { ServerBootstrap::socketConfig.enableTCPFastOpen = *enableTFO_; ServerBootstrap::socketConfig.fastOpenQueueSize = fastOpenQueueSize_; } ioThreadPool_->addObserver( folly::IOThreadPoolDeadlockDetectorObserver::create( ioThreadPool_->getName())); ioObserverFactories.withRLock([this](auto& factories) { for (auto& f : factories) { ioThreadPool_->addObserver(f( ioThreadPool_->getName(), ioThreadPool_->getThreadIdCollector())); } }); // Resize the IO pool ioThreadPool_->setNumThreads(nWorkers); if (!acceptPool_) { acceptPool_ = std::make_shared<folly::IOThreadPoolExecutor>( nAcceptors_, std::make_shared<folly::NamedThreadFactory>("Acceptor Thread")); } auto acceptorFactory = acceptorFactory_ ? acceptorFactory_ : std::make_shared<DefaultThriftAcceptorFactory>(this); if (auto factory = dynamic_cast<wangle::AcceptorFactorySharedSSLContext*>( acceptorFactory.get())) { sharedSSLContextManager_ = factory->initSharedSSLContextManager(); } ServerBootstrap::childHandler(std::move(acceptorFactory)); { std::lock_guard<std::mutex> lock(ioGroupMutex_); ServerBootstrap::group(acceptPool_, ioThreadPool_); } if (socket_) { ServerBootstrap::bind(std::move(socket_)); } else if (!getAddress().isInitialized()) { ServerBootstrap::bind(port_); } else { for (auto& address : addresses_) { ServerBootstrap::bind(address); } } // Update address_ with the address that we are actually bound to. // (This is needed if we were supplied a pre-bound socket, or if // address_'s port was set to 0, so an ephemeral port was chosen by // the kernel.) ServerBootstrap::getSockets()[0]->getAddress(&addresses_.at(0)); // we enable zerocopy for the server socket if the // zeroCopyEnableFunc_ is valid bool useZeroCopy = !!zeroCopyEnableFunc_; for (auto& socket : getSockets()) { auto* evb = socket->getEventBase(); evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] { socket->setShutdownSocketSet(wShutdownSocketSet_); socket->setAcceptRateAdjustSpeed(acceptRateAdjustSpeed_); socket->setZeroCopy(useZeroCopy); socket->setQueueTimeout(getSocketQueueTimeout()); try { socket->setTosReflect(tosReflect_); socket->setListenerTos(listenerTos_); } catch (std::exception const& ex) { LOG(ERROR) << "Got exception setting up TOS settings: " << folly::exceptionStr(ex); } }); } } else { startDuplex(); } #if FOLLY_HAS_COROUTINES asyncScope_ = std::make_unique<folly::coro::CancellableAsyncScope>(); #endif for (auto handler : collectServiceHandlers()) { handler->attachServer(*this); } DCHECK( internalStatus_.load(std::memory_order_relaxed) == ServerStatus::NOT_RUNNING); // The server is not yet ready for the user's service methods but fine to // handle internal methods. See ServerConfigs::getInternalMethods(). internalStatus_.store( ServerStatus::PRE_STARTING, std::memory_order_release); // Notify handler of the preStart event for (const auto& eventHandler : getEventHandlersUnsafe()) { eventHandler->preStart(&addresses_.at(0)); } internalStatus_.store(ServerStatus::STARTING, std::memory_order_release); // Called after setup callOnStartServing(); // After the onStartServing hooks have finished, we are ready to handle // requests, at least from the server's perspective. internalStatus_.store(ServerStatus::RUNNING, std::memory_order_release); #if FOLLY_HAS_COROUTINES // Set up polling for PolledServiceHealth handlers if necessary { DCHECK(!getServiceHealth().has_value()); auto handlers = collectServiceHandlers<PolledServiceHealth>(); if (!handlers.empty()) { auto poll = ServiceHealthPoller::poll( std::move(handlers), getPolledServiceHealthLivenessObserver()); auto loop = folly::coro::co_invoke( [this, poll = std::move(poll)]() mutable -> folly::coro::Task<void> { while (auto value = co_await poll.next()) { co_await folly::coro::co_safe_point; cachedServiceHealth_.store(*value, std::memory_order_relaxed); } }); asyncScope_->add(std::move(loop).scheduleOn(getExecutor())); } } #endif // Notify handler of the preServe event for (const auto& eventHandler : getEventHandlersUnsafe()) { eventHandler->preServe(&addresses_.at(0)); } // Do not allow setters to be called past this point until the IO worker // threads have been joined in stopWorkers(). configMutable_ = false; } catch (std::exception& ex) { // This block allows us to investigate the exception using gdb LOG(ERROR) << "Got an exception while setting up the server: " << ex.what(); handleSetupFailure(); throw; } catch (...) { handleSetupFailure(); throw; } THRIFT_SERVER_EVENT(serve).log(*this); } void ThriftServer::setupThreadManager() { if (!threadManager_) { std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager; switch (threadManagerType_) { case ThreadManagerType::PRIORITY: if (std::any_of( std::begin(threadManagerPoolSizes_), std::end(threadManagerPoolSizes_), [](std::size_t c) { return c != 0; })) { // The priorities were specified using setThreadManagerPoolSizes threadManager = PriorityThreadManager::newPriorityThreadManager( threadManagerPoolSizes_); } else { threadManager = PriorityThreadManager::newPriorityThreadManager( getNumCPUWorkerThreads()); } break; case ThreadManagerType::SIMPLE: threadManager = ThreadManager::newSimpleThreadManager(getNumCPUWorkerThreads()); break; } threadManager->enableCodel(getEnableCodel()); // If a thread factory has been specified, use it. if (threadFactory_) { threadManager->threadFactory(threadFactory_); } auto poolThreadName = getCPUWorkerThreadName(); if (!poolThreadName.empty()) { threadManager->setNamePrefix(poolThreadName); } threadManager->start(); setThreadManagerInternal(threadManager); } } void ThriftServer::ensureResourcePools() { // If the user has supplied resource pools we will believe them. if (!resourcePoolSet().empty()) { LOG(INFO) << "Using non default ResourcePoolSet"; return; } // Create the sync resource pool. resourcePoolSet().setResourcePool( ResourcePoolHandle::defaultSync(), /*requestPile=*/nullptr, /*executor=*/nullptr, /*concurrencyController=*/nullptr); // Now create the HIGH_IMPORTANT, HIGH, IMPORTANT, NORMAL and BEST_EFFORT // pools. NORMAL gets NumCPUWorkerThreads, the rest get two each. struct Pool { std::string_view name; std::string_view suffix; int priority; size_t numThreads; std::optional<ResourcePoolHandle> handle; }; // TODO: T111371879 [thrift][resourcepools] Figure out priorities for default // setup in ensureResourcePool including non-linux // These priority numbers are what thrift currently derives for a nice range // of 19 to -20. Pool pools[] = { {"HIGH_IMPORTANT", "HI", -13, 2, std::nullopt}, {"HIGH", "H", -7, 2, std::nullopt}, {"IMPORTANT", "I", -7, 2, std::nullopt}, {"NORMAL", "", 0, getNumCPUWorkerThreads(), ResourcePoolHandle::defaultAsync()}, {"BEST_EFFORT", "BE", 6, 2, std::nullopt}}; for (auto const& pool : pools) { std::string name = fmt::format("{}.{}", getCPUWorkerThreadName(), pool.suffix); auto factory = std::make_shared<folly::PriorityThreadFactory>( std::make_shared<folly::NamedThreadFactory>(name), pool.priority); auto executor = std::make_shared<folly::CPUThreadPoolExecutor>( pool.numThreads, std::move(factory)); apache::thrift::RoundRobinRequestPile::Options options; auto requestPile = std::make_unique<apache::thrift::RoundRobinRequestPile>( std::move(options)); auto concurrencyController = std::make_unique<apache::thrift::ParallelConcurrencyController>( *requestPile.get(), *executor.get()); if (pool.handle) { resourcePoolSet().setResourcePool( ResourcePoolHandle::defaultAsync(), std::move(requestPile), executor, std::move(concurrencyController)); } else { resourcePoolSet().addResourcePool( pool.name, std::move(requestPile), executor, std::move(concurrencyController)); } } } /** * Preferably use this method in order to start ThriftServer created for * DuplexChannel instead of the serve() method. */ void ThriftServer::startDuplex() { CHECK(configMutable()); ensureDecoratedProcessorFactoryInitialized(); duplexWorker_ = Cpp2Worker::create(this, serverChannel_); // we don't control the EventBase for the duplexWorker, so when we shut // it down, we need to ensure there's no delay duplexWorker_->setGracefulShutdownTimeout(std::chrono::milliseconds(0)); } /** * This method should be used to cleanly stop a ThriftServer created for * DuplexChannel before disposing the ThriftServer. The caller should pass in * a shared_ptr to this ThriftServer since the ThriftServer does not have a * way of getting that (does not inherit from enable_shared_from_this) */ void ThriftServer::stopDuplex(std::shared_ptr<ThriftServer> thisServer) { DCHECK(this == thisServer.get()); DCHECK(duplexWorker_ != nullptr); // Try to stop our Worker but this cannot stop in flight requests // Instead, it will capture a shared_ptr back to us, keeping us alive // until it really goes away (when in-flight requests are gone) duplexWorker_->stopDuplex(thisServer); // Get rid of our reference to the worker to avoid forming a cycle duplexWorker_ = nullptr; } /** * Loop and accept incoming connections. */ void ThriftServer::serve() { setup(); if (serverChannel_ != nullptr) { // A duplex server (the one running on a client) doesn't uses its own EB // since it reuses the client's EB return; } SCOPE_EXIT { this->cleanUp(); }; auto sslContextConfigCallbackHandle = sslContextObserver_ ? getSSLCallbackHandle() : folly::observer::CallbackHandle{}; tracker_.emplace(instrumentation::kThriftServerTrackerKey, *this); eventBaseManager_->getEventBase()->loopForever(); } void ThriftServer::cleanUp() { DCHECK(!serverChannel_); // tlsCredWatcher_ uses a background thread that needs to be joined prior // to any further writes to ThriftServer members. tlsCredWatcher_.withWLock([](auto& credWatcher) { credWatcher.reset(); }); // It is users duty to make sure that setup() call // should have returned before doing this cleanup idleServer_.reset(); serveEventBase_ = nullptr; stopController_.join(); stopListening(); // Stop the routing handlers. for (auto& handler : routingHandlers_) { handler->stopListening(); } if (stopWorkersOnStopListening_) { // Wait on the i/o worker threads to actually stop stopWorkers(); } else if (joinRequestsWhenServerStops_) { stopAcceptingAndJoinOutstandingRequests(); } for (auto handler : getProcessorFactory()->getServiceHandlers()) { handler->detachServer(); } // Now clear all the handlers routingHandlers_.clear(); } uint64_t ThriftServer::getNumDroppedConnections() const { uint64_t droppedConns = 0; for (auto& socket : getSockets()) { droppedConns += socket->getNumDroppedConnections(); } return droppedConns; } void ThriftServerStopController::stop() { folly::call_once(stopped_, [&] { serveEventBase_.terminateLoopSoon(); }); } void ThriftServer::stop() { if (auto s = stopController_.lock()) { s->stop(); } } void ThriftServer::stopListening() { // We have to make sure stopListening() is not called twice when both // stopListening() and cleanUp() are called { auto expected = ServerStatus::RUNNING; if (!internalStatus_.compare_exchange_strong( expected, ServerStatus::PRE_STOPPING, std::memory_order_release, std::memory_order_relaxed)) { // stopListening() was called earlier DCHECK( expected == ServerStatus::PRE_STOPPING || expected == ServerStatus::STOPPING || expected == ServerStatus::DRAINING_UNTIL_STOPPED || expected == ServerStatus::NOT_RUNNING); return; } } #if FOLLY_HAS_COROUTINES asyncScope_->requestCancellation(); #endif { auto sockets = getSockets(); folly::Baton<> done; SCOPE_EXIT { done.wait(); }; std::shared_ptr<folly::Baton<>> doneGuard( &done, [](folly::Baton<>* done) { done->post(); }); for (auto& socket : sockets) { // Stop accepting new connections auto eb = socket->getEventBase(); eb->runInEventBaseThread([socket = std::move(socket), doneGuard] { socket->pauseAccepting(); }); } } if (stopWorkersOnStopListening_) { stopAcceptingAndJoinOutstandingRequests(); stopCPUWorkers(); } } void ThriftServer::stopWorkers() { if (serverChannel_) { return; } DCHECK(!duplexWorker_); ServerBootstrap::stop(); ServerBootstrap::join(); configMutable_ = true; } void ThriftServer::stopAcceptingAndJoinOutstandingRequests() { { auto expected = ServerStatus::PRE_STOPPING; if (!internalStatus_.compare_exchange_strong( expected, ServerStatus::STOPPING, std::memory_order_release, std::memory_order_relaxed)) { // stopListening() was called earlier DCHECK( expected == ServerStatus::STOPPING || expected == ServerStatus::DRAINING_UNTIL_STOPPED || expected == ServerStatus::NOT_RUNNING); return; } } callOnStopRequested(); internalStatus_.store( ServerStatus::DRAINING_UNTIL_STOPPED, std::memory_order_release); forEachWorker([&](wangle::Acceptor* acceptor) { if (auto worker = dynamic_cast<Cpp2Worker*>(acceptor)) { worker->requestStop(); } }); // tlsCredWatcher_ uses a background thread that needs to be joined prior // to any further writes to ThriftServer members. tlsCredWatcher_.withWLock([](auto& credWatcher) { credWatcher.reset(); }); sharedSSLContextManager_ = nullptr; { auto sockets = getSockets(); folly::Baton<> done; SCOPE_EXIT { done.wait(); }; std::shared_ptr<folly::Baton<>> doneGuard( &done, [](folly::Baton<>* done) { done->post(); }); for (auto& socket : sockets) { // We should have already paused accepting new connections. This just // closes the sockets once and for all. auto eb = socket->getEventBase(); eb->runInEventBaseThread([socket = std::move(socket), doneGuard] { // This will also cause the workers to stop socket->stopAccepting(); }); } } auto joinDeadline = std::chrono::steady_clock::now() + getWorkersJoinTimeout(); bool dumpSnapshotFlag = THRIFT_FLAG(dump_snapshot_on_long_shutdown); forEachWorker([&](wangle::Acceptor* acceptor) { if (auto worker = dynamic_cast<Cpp2Worker*>(acceptor)) { if (!worker->waitForStop(joinDeadline)) { // Before we crash, let's dump a snapshot of the server. // We create the CPUThreadPoolExecutor outside the if block so that it // doesn't wait for our task to complete when exiting the block even // after the timeout expires as we can't cancel the task folly::CPUThreadPoolExecutor dumpSnapshotExecutor{1}; if (dumpSnapshotFlag) { // The IO threads may be deadlocked in which case we won't be able to // dump snapshots. It still shouldn't block shutdown indefinitely. auto dumpSnapshotResult = apache::thrift::detail::dumpSnapshotOnLongShutdown(); try { std::move(dumpSnapshotResult.task) .via(folly::getKeepAliveToken(dumpSnapshotExecutor)) .get(dumpSnapshotResult.timeout); } catch (...) { LOG(ERROR) << "Failed to dump server snapshot on long shutdown: " << folly::exceptionStr(std::current_exception()); } } constexpr auto msgTemplate = "Could not drain active requests within allotted deadline. " "Deadline value: {} secs. {} because undefined behavior is possible. " "Underlying reasons could be either requests that have never " "terminated, long running requests, or long queues that could " "not be fully processed."; if (quickExitOnShutdownTimeout_) { LOG(ERROR) << fmt::format( msgTemplate, getWorkersJoinTimeout().count(), "quick_exiting (no coredump)"); // similar to abort but without generating a coredump try_quick_exit(124); } if (FLAGS_thrift_abort_if_exceeds_shutdown_deadline) { LOG(FATAL) << fmt::format( msgTemplate, getWorkersJoinTimeout().count(), "Aborting"); } } } }); // Clear the decorated processor factory so that it's re-created if the server // is restarted. // Note that duplex servers drain connections in the destructor so we need to // keep the AsyncProcessorFactory alive until then. Duplex servers also don't // support restarting the server so extending its lifetime should not cause // issues. if (!isDuplex()) { decoratedProcessorFactory_.reset(); } internalStatus_.store(ServerStatus::NOT_RUNNING, std::memory_order_release); } void ThriftServer::ensureDecoratedProcessorFactoryInitialized() { DCHECK(getProcessorFactory().get()); if (decoratedProcessorFactory_ == nullptr) { decoratedProcessorFactory_ = createDecoratedProcessorFactory( getProcessorFactory(), getStatusInterface(), getMonitoringInterface(), getControlInterface(), isCheckUnimplementedExtraInterfacesAllowed() && THRIFT_FLAG(server_check_unimplemented_extra_interfaces)); } } void ThriftServer::callOnStartServing() { auto handlerList = collectServiceHandlers(); // Exception is handled in setup() std::vector<folly::SemiFuture<folly::Unit>> futures; futures.reserve(handlerList.size()); for (auto handler : handlerList) { futures.emplace_back( folly::makeSemiFuture().deferValue([handler](folly::Unit) { return handler->semifuture_onStartServing(); })); } folly::collectAll(futures).via(getExecutor()).get(); } void ThriftServer::callOnStopRequested() { auto handlerList = collectServiceHandlers(); std::vector<folly::SemiFuture<folly::Unit>> futures; futures.reserve(handlerList.size()); for (auto handler : handlerList) { futures.emplace_back( THRIFT_FLAG(enable_on_stop_serving) ? folly::makeSemiFuture().deferValue([handler](folly::Unit) { return handler->semifuture_onStopRequested(); }) : handler->semifuture_onStopRequested()); } auto results = folly::collectAll(futures).via(getExecutor()).get(); for (auto& result : results) { if (result.hasException()) { LOG(FATAL) << "Exception thrown by onStopRequested(): " << folly::exceptionStr(result.exception()); } } } namespace { ThriftServer* globalServer = nullptr; } #if FOLLY_HAS_COROUTINES folly::coro::CancellableAsyncScope& ThriftServer::getGlobalAsyncScope() { DCHECK(globalServer); auto asyncScope = globalServer->getAsyncScope(); DCHECK(asyncScope); return *asyncScope; } #endif void ThriftServer::setGlobalServer(ThriftServer* server) { globalServer = server; } void ThriftServer::stopCPUWorkers() { // Wait for any tasks currently running on the task queue workers to // finish, then stop the task queue workers. Have to do this now, so // there aren't tasks completing and trying to write to i/o thread // workers after we've stopped the i/o workers. if (threadManager_) { threadManager_->join(); } resourcePoolSet().stopAndJoin(); #if FOLLY_HAS_COROUTINES // Wait for tasks running on AsyncScope to join folly::coro::blockingWait(asyncScope_->joinAsync()); cachedServiceHealth_.store(ServiceHealth{}, std::memory_order_relaxed); #endif // Notify handler of the postStop event for (const auto& eventHandler : getEventHandlersUnsafe()) { eventHandler->postStop(); } #if FOLLY_HAS_COROUTINES asyncScope_.reset(); #endif } void ThriftServer::handleSetupFailure(void) { ServerBootstrap::stop(); // avoid crash on stop() idleServer_.reset(); serveEventBase_ = nullptr; stopController_.join(); } void ThriftServer::updateTicketSeeds(wangle::TLSTicketKeySeeds seeds) { if (sharedSSLContextManager_) { sharedSSLContextManager_->updateTLSTicketKeys(seeds); } else { forEachWorker([&](wangle::Acceptor* acceptor) { if (!acceptor) { return; } auto evb = acceptor->getEventBase(); if (!evb) { return; } evb->runInEventBaseThread([acceptor, seeds] { acceptor->setTLSTicketSecrets( seeds.oldSeeds, seeds.currentSeeds, seeds.newSeeds); }); }); } } void ThriftServer::updateTLSCert() { if (sharedSSLContextManager_) { sharedSSLContextManager_->reloadSSLContextConfigs(); } else { forEachWorker([&](wangle::Acceptor* acceptor) { if (!acceptor) { return; } auto evb = acceptor->getEventBase(); if (!evb) { return; } evb->runInEventBaseThread( [acceptor] { acceptor->resetSSLContextConfigs(); }); }); } } void ThriftServer::updateCertsToWatch() { std::set<std::string> certPaths; if (sslContextObserver_.has_value()) { auto sslContext = *sslContextObserver_->getSnapshot(); if (!sslContext.certificates.empty()) { const auto& cert = sslContext.certificates[0]; certPaths.insert(cert.certPath); certPaths.insert(cert.keyPath); certPaths.insert(cert.passwordPath); } certPaths.insert(sslContext.clientCAFile); } tlsCredWatcher_.withWLock([this, &certPaths](auto& credWatcher) { if (!credWatcher) { credWatcher.emplace(this); } credWatcher->setCertPathsToWatch(std::move(certPaths)); }); } void ThriftServer::watchTicketPathForChanges(const std::string& ticketPath) { auto seeds = TLSCredProcessor::processTLSTickets(ticketPath); if (seeds) { setTicketSeeds(std::move(*seeds)); } tlsCredWatcher_.withWLock([this, &ticketPath](auto& credWatcher) { if (!credWatcher) { credWatcher.emplace(this); } credWatcher->setTicketPathToWatch(ticketPath); }); } PreprocessResult ThriftServer::preprocess( const PreprocessParams& params) const { if (preprocess_) { return preprocess_(params); } return {}; } folly::Optional<std::string> ThriftServer::checkOverload( const transport::THeader::StringToStringMap* readHeaders, const std::string* method) const { if (UNLIKELY(isOverloaded_ && isOverloaded_(readHeaders, method))) { return kAppOverloadedErrorCode; } // only check for request limit if active request tracking is enabled if (!isActiveRequestsTrackingDisabled()) { if (auto maxRequests = getMaxRequests(); maxRequests > 0 && (method == nullptr || !getMethodsBypassMaxRequestsLimit().contains(*method)) && static_cast<uint32_t>(getActiveRequests()) >= maxRequests) { return kOverloadedErrorCode; } } return {}; } std::string ThriftServer::getLoadInfo(int64_t load) const { auto ioGroup = getIOGroupSafe(); auto workerFactory = ioGroup != nullptr ? std::dynamic_pointer_cast<folly::NamedThreadFactory>( ioGroup->getThreadFactory()) : nullptr; if (!workerFactory) { return ""; } std::stringstream stream; stream << workerFactory->getNamePrefix() << " load is: " << load << "% requests, " << getActiveRequests() << " active reqs"; return stream.str(); } void ThriftServer::replaceShutdownSocketSet( const std::shared_ptr<folly::ShutdownSocketSet>& newSSS) { wShutdownSocketSet_ = newSSS; } folly::SemiFuture<ThriftServer::ServerIOMemory> ThriftServer::getUsedIOMemory() { // WorkerIOMemory looks the same as the server, except they are unaggregated using WorkerIOMemory = ServerIOMemory; std::vector<folly::SemiFuture<WorkerIOMemory>> tasks; forEachWorker([&tasks](wangle::Acceptor* acceptor) { auto worker = dynamic_cast<Cpp2Worker*>(acceptor); if (!worker) { return; } auto fut = folly::via(worker->getEventBase(), [worker]() { auto& ingressMemTracker = worker->getIngressMemoryTracker(); auto& egressMemTracker = worker->getEgressMemoryTracker(); return WorkerIOMemory{ ingressMemTracker.getUsage(), egressMemTracker.getUsage()}; }); tasks.emplace_back(std::move(fut)); }); return folly::collect(tasks.begin(), tasks.end()) .deferValue( [](std::vector<WorkerIOMemory> workerIOMems) -> ServerIOMemory { ServerIOMemory ret{0, 0}; // Sum all ingress and egress usages for (const auto& workerIOMem : workerIOMems) { ret.ingress += workerIOMem.ingress; ret.egress += workerIOMem.egress; } return ret; }); } folly::SemiFuture<ThriftServer::ServerSnapshot> ThriftServer::getServerSnapshot( const SnapshotOptions& options) { // WorkerSnapshots look the same as the server, except they are unaggregated using WorkerSnapshot = ServerSnapshot; std::vector<folly::SemiFuture<WorkerSnapshot>> tasks; const auto snapshotTime = std::chrono::steady_clock::now(); forEachWorker([&tasks, snapshotTime, options](wangle::Acceptor* acceptor) { auto worker = dynamic_cast<Cpp2Worker*>(acceptor); if (!worker) { return; } auto fut = folly::via(worker->getEventBase(), [worker, snapshotTime, options]() { auto reqRegistry = worker->getRequestsRegistry(); DCHECK(reqRegistry); RequestSnapshots requestSnapshots; if (reqRegistry != nullptr) { for (const auto& stub : reqRegistry->getActive()) { requestSnapshots.emplace_back(stub); } for (const auto& stub : reqRegistry->getFinished()) { requestSnapshots.emplace_back(stub); } } std::unordered_map<folly::SocketAddress, ConnectionSnapshot> connectionSnapshots; // ConnectionManager can be nullptr if the worker didn't have any open // connections during shutdown if (auto connectionManager = worker->getConnectionManager()) { connectionManager->forEachConnection([&](wangle::ManagedConnection* wangleConnection) { if (auto managedConnection = dynamic_cast<ManagedConnectionIf*>(wangleConnection)) { auto numActiveRequests = managedConnection->getNumActiveRequests(); auto numPendingWrites = managedConnection->getNumPendingWrites(); auto creationTime = managedConnection->getCreationTime(); auto minCreationTime = snapshotTime - options.connectionsAgeMax; if (numActiveRequests > 0 || numPendingWrites > 0 || creationTime > minCreationTime) { connectionSnapshots.emplace( managedConnection->getPeerAddress(), ConnectionSnapshot{ numActiveRequests, numPendingWrites, creationTime}); } } }); } ServerIOMemory serverIOMemory; serverIOMemory.ingress = worker->getIngressMemoryTracker().getUsage(); serverIOMemory.egress = worker->getEgressMemoryTracker().getUsage(); return WorkerSnapshot{ worker->getRequestsRegistry()->getRequestCounter().get(), std::move(requestSnapshots), std::move(connectionSnapshots), std::move(serverIOMemory)}; }); tasks.emplace_back(std::move(fut)); }); return folly::collect(tasks.begin(), tasks.end()) .deferValue( [](std::vector<WorkerSnapshot> workerSnapshots) -> ServerSnapshot { ServerSnapshot ret{}; // Sum all request and connection counts and memory usages size_t numRequests = 0; size_t numConnections = 0; for (const auto& workerSnapshot : workerSnapshots) { for (uint64_t i = 0; i < ret.recentCounters.size(); ++i) { ret.recentCounters[i].first += workerSnapshot.recentCounters[i].first; ret.recentCounters[i].second += workerSnapshot.recentCounters[i].second; } numRequests += workerSnapshot.requests.size(); numConnections += workerSnapshot.connections.size(); ret.memory.ingress += workerSnapshot.memory.ingress; ret.memory.egress += workerSnapshot.memory.egress; } // Move all RequestSnapshots, ServerIOMemory and ConnectionSnapshots // to ServerSnapshot ret.requests.reserve(numRequests); ret.connections.reserve(numConnections); for (auto& workerSnapshot : workerSnapshots) { auto& requests = workerSnapshot.requests; std::move( requests.begin(), requests.end(), std::back_inserter(ret.requests)); auto& connections = workerSnapshot.connections; std::move( connections.begin(), connections.end(), std::inserter(ret.connections, ret.connections.end())); } return ret; }); } folly::observer::Observer<std::list<std::string>> ThriftServer::defaultNextProtocols() { return folly::observer::makeObserver( [rocketPreferredObserver = THRIFT_FLAG_OBSERVE(server_alpn_prefer_rocket)] { const auto rocketPreferred = *rocketPreferredObserver.getSnapshot(); if (rocketPreferred) { return std::list<std::string>{ "rs", "thrift", "h2", // "http" is not a legit specifier but need to include it for // legacy. Thrift's HTTP2RoutingHandler uses this, and clients // may be sending it. "http", // Many clients still send http/1.1 which is handled by the // default handler. "http/1.1"}; } return std::list<std::string>{ "thrift", "h2", // "http" is not a legit specifier but need to include it for // legacy. Thrift's HTTP2RoutingHandler uses this, and clients // may be sending it. "http", // Many clients still send http/1.1 which is handled by the default // handler. "http/1.1", "rs"}; }); } folly::observer::Observer<bool> ThriftServer::enableStopTLS() { return THRIFT_FLAG_OBSERVE(server_enable_stoptls); } folly::observer::CallbackHandle ThriftServer::getSSLCallbackHandle() { auto originalPid = folly::get_cached_pid(); return sslContextObserver_->addCallback([&, originalPid](auto ssl) { // Because we are posting to an EventBase, we need to ensure that this // observer callback is not executing on a fork()'d child. // // The scenario this can happen is if: // 1) The FlagsBackend observer implementation persists in the child. (e.g. // a custom atfork handler reinitializes and resubscribes to updates) // 2) A thrift handler fork()s (e.g. Python thrift server which // uses concurrent.futures.ProcessPoolExecutor) // 3) A flag is changed that causes an update to sslContextObserver if (folly::get_cached_pid() != originalPid) { LOG(WARNING) << "Ignoring SSLContext update triggered by observer in forked process."; return; } if (sharedSSLContextManager_) { sharedSSLContextManager_->updateSSLConfigAndReloadContexts(*ssl); } else { // "this" needed due to // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=67274 this->forEachWorker([&](wangle::Acceptor* acceptor) { auto evb = acceptor->getEventBase(); if (!evb) { return; } evb->runInEventBaseThread([acceptor, ssl] { for (auto& sslContext : acceptor->getConfig().sslContextConfigs) { sslContext = *ssl; } acceptor->resetSSLContextConfigs(); }); }); } this->updateCertsToWatch(); }); } folly::observer::Observer<bool> ThriftServer::alpnAllowMismatch() { return THRIFT_FLAG_OBSERVE(alpn_allow_mismatch); } void ThriftServer::addIOThreadPoolObserver( ThriftServer::IOObserverFactory factory) { ioObserverFactories.wlock()->push_back(std::move(factory)); } /* static */ std::shared_ptr<folly::IOThreadPoolExecutor> ThriftServer::createIOThreadPool() { return std::make_shared<folly::IOThreadPoolExecutor>( 0, std::make_shared<folly::NamedThreadFactory>("ThriftIO"), folly::EventBaseManager::get(), folly::IOThreadPoolExecutor::Options().setEnableThreadIdCollection( THRIFT_FLAG(enable_io_queue_lag_detection))); } } // namespace thrift } // namespace apache