thrift/lib/cpp2/server/ThriftServer.h (670 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef THRIFT_SERVER_H_
#define THRIFT_SERVER_H_ 1
#include <array>
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <map>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <utility>
#include <vector>
#include <folly/Memory.h>
#include <folly/Singleton.h>
#include <folly/SocketAddress.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/experimental/PrimaryPtr.h>
#include <folly/experimental/coro/AsyncScope.h>
#include <folly/experimental/observer/Observer.h>
#include <folly/io/ShutdownSocketSet.h>
#include <folly/io/async/AsyncServerSocket.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/lang/Badge.h>
#include <folly/synchronization/CallOnce.h>
#include <thrift/lib/cpp/concurrency/PosixThreadFactory.h>
#include <thrift/lib/cpp/concurrency/ThreadManager.h>
#include <thrift/lib/cpp/server/TServerObserver.h>
#include <thrift/lib/cpp/transport/THeader.h>
#include <thrift/lib/cpp2/PluggableFunction.h>
#include <thrift/lib/cpp2/Thrift.h>
#include <thrift/lib/cpp2/async/AsyncProcessor.h>
#include <thrift/lib/cpp2/async/HeaderServerChannel.h>
#include <thrift/lib/cpp2/server/BaseThriftServer.h>
#include <thrift/lib/cpp2/server/PolledServiceHealth.h>
#include <thrift/lib/cpp2/server/PreprocessParams.h>
#include <thrift/lib/cpp2/server/RequestDebugLog.h>
#include <thrift/lib/cpp2/server/RequestsRegistry.h>
#include <thrift/lib/cpp2/server/ServerInstrumentation.h>
#include <thrift/lib/cpp2/server/ServiceHealthPoller.h>
#include <thrift/lib/cpp2/server/TransportRoutingHandler.h>
#include <thrift/lib/cpp2/transport/rocket/PayloadUtils.h>
#include <thrift/lib/cpp2/transport/rocket/Types.h>
#include <thrift/lib/thrift/gen-cpp2/RpcMetadata_constants.h>
#include <wangle/acceptor/ServerSocketConfig.h>
#include <wangle/acceptor/SharedSSLContextManager.h>
#include <wangle/bootstrap/ServerBootstrap.h>
#include <wangle/ssl/SSLContextConfig.h>
#include <wangle/ssl/TLSCredProcessor.h>
DECLARE_bool(thrift_abort_if_exceeds_shutdown_deadline);
DECLARE_string(service_identity);
THRIFT_FLAG_DECLARE_bool(dump_snapshot_on_long_shutdown);
THRIFT_FLAG_DECLARE_bool(alpn_allow_mismatch);
THRIFT_FLAG_DECLARE_bool(server_check_unimplemented_extra_interfaces);
THRIFT_FLAG_DECLARE_bool(enable_io_queue_lag_detection);
namespace apache {
namespace thrift {
// Forward declaration of classes
class Cpp2Connection;
class Cpp2Worker;
class ThriftServer;
class ThriftProcessor;
namespace rocket {
class ThriftRocketServerHandler;
}
enum class SSLPolicy { DISABLED, PERMITTED, REQUIRED };
typedef wangle::Pipeline<folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>>
Pipeline;
class ThriftTlsConfig : public wangle::CustomConfig {
public:
bool enableThriftParamsNegotiation{true};
bool enableStopTLS{false};
};
class TLSCredentialWatcher {
public:
explicit TLSCredentialWatcher(ThriftServer* server);
void setCertPathsToWatch(std::set<std::string> paths) {
credProcessor_.setCertPathsToWatch(std::move(paths));
}
void setTicketPathToWatch(const std::string& path) {
credProcessor_.setTicketPathToWatch(path);
}
private:
wangle::TLSCredProcessor credProcessor_;
};
/**
* State pertaining to stopping a running Thrift server. It is safe to call
* stop() on this even after the relevant ThriftServer has been destroyed as
* long as the server's event base is not re-used for something else. This is
* useful to prevent racing between requests to stop (such as from signal
* handlers) and ThriftServer's destructor.
*
* This class cannot be directly constructed by user code. Instead every
* ThriftServer owns a stop controller (using folly::PrimaryPtr) and hands out
* non-owning references to use (folly::PrimaryPtrRef). ThriftServer's
* destructor will block until all locked references have been released.
*
* The user-facing API that makes use of this is ServiceHandler::shutdownServer.
*/
class ThriftServerStopController final {
public:
explicit ThriftServerStopController(
folly::badge<ThriftServer>, folly::EventBase& eventBase)
: serveEventBase_(eventBase) {}
void stop();
private:
folly::EventBase& serveEventBase_;
folly::once_flag stopped_;
};
/**
* This is yet another thrift server.
* Uses cpp2 style generated code.
*/
class ThriftServer : public apache::thrift::BaseThriftServer,
public wangle::ServerBootstrap<Pipeline> {
private:
//! SSL context
std::optional<folly::observer::Observer<wangle::SSLContextConfig>>
sslContextObserver_;
std::optional<wangle::TLSTicketKeySeeds> ticketSeeds_;
folly::observer::CallbackHandle getSSLCallbackHandle();
std::optional<bool> reusePort_;
std::optional<bool> enableTFO_;
uint32_t fastOpenQueueSize_{10000};
std::optional<wangle::SSLCacheOptions> sslCacheOptions_;
wangle::FizzConfig fizzConfig_;
ThriftTlsConfig thriftConfig_;
// Security negotiation settings
std::optional<SSLPolicy> sslPolicy_;
bool strictSSL_ = false;
// whether we allow plaintext connections from loopback in REQUIRED mode
bool allowPlaintextOnLoopback_ = false;
// If true, then falls back to the corresponding THRIFT_FLAG.
// If false, then the check is bypassed even if the THRIFT_FLAG is set.
// This allows a hard-coded opt-out of the check for services where it would
// not be useful, e.g. non-C++ languages.
bool allowCheckUnimplementedExtraInterfaces_ = true;
std::weak_ptr<folly::ShutdownSocketSet> wShutdownSocketSet_;
//! Listen socket
folly::AsyncServerSocket::UniquePtr socket_;
struct IdleServerAction : public folly::HHWheelTimer::Callback {
IdleServerAction(
ThriftServer& server,
folly::HHWheelTimer& timer,
std::chrono::milliseconds timeout);
void timeoutExpired() noexcept override;
ThriftServer& server_;
folly::HHWheelTimer& timer_;
std::chrono::milliseconds timeout_;
};
//! The folly::EventBase currently driving serve(). NULL when not serving.
std::atomic<folly::EventBase*> serveEventBase_{nullptr};
std::optional<IdleServerAction> idleServer_;
std::chrono::milliseconds idleServerTimeout_ = std::chrono::milliseconds(0);
std::optional<std::chrono::milliseconds> sslHandshakeTimeout_;
std::atomic<std::chrono::steady_clock::duration::rep> lastRequestTime_;
// Includes non-request events in Rocket. Only bumped if idleTimeout set.
std::chrono::steady_clock::time_point lastRequestTime() const noexcept;
void touchRequestTimestamp() noexcept;
//! Manager of per-thread EventBase objects.
folly::EventBaseManager* eventBaseManager_ = folly::EventBaseManager::get();
// Creates the default ThriftIO IOThreadPoolExecutor
static std::shared_ptr<folly::IOThreadPoolExecutor> createIOThreadPool();
//! IO thread pool. Drives Cpp2Workers.
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_ =
createIOThreadPool();
/**
* The speed for adjusting connection accept rate.
* 0 for disabling auto adjusting connection accept rate.
*/
double acceptRateAdjustSpeed_ = 0.0;
/**
* Acceptors accept and process incoming connections. The acceptor factory
* helps create acceptors.
*/
std::shared_ptr<wangle::AcceptorFactory> acceptorFactory_;
std::shared_ptr<wangle::SharedSSLContextManager> sharedSSLContextManager_;
void handleSetupFailure(void);
void updateCertsToWatch();
bool stopWorkersOnStopListening_ = true;
bool joinRequestsWhenServerStops_{true};
folly::AsyncWriter::ZeroCopyEnableFunc zeroCopyEnableFunc_;
std::shared_ptr<folly::IOThreadPoolExecutor> acceptPool_;
int nAcceptors_ = 1;
uint16_t socketMaxReadsPerEvent_{16};
// HeaderServerChannel and Cpp2Worker to use for a duplex server
// (used by client). Both are nullptr for a regular server.
std::shared_ptr<HeaderServerChannel> serverChannel_;
std::shared_ptr<Cpp2Worker> duplexWorker_;
bool isDuplex_ = false; // is server in duplex mode? (used by server)
mutable std::mutex ioGroupMutex_;
std::shared_ptr<folly::IOThreadPoolExecutor> getIOGroupSafe() const {
std::lock_guard<std::mutex> lock(ioGroupMutex_);
return getIOGroup();
}
void stopWorkers();
void stopCPUWorkers();
void stopAcceptingAndJoinOutstandingRequests();
void callOnStartServing();
void callOnStopRequested();
void ensureDecoratedProcessorFactoryInitialized();
#if FOLLY_HAS_COROUTINES
std::unique_ptr<folly::coro::CancellableAsyncScope> asyncScope_;
#endif
folly::Synchronized<std::optional<TLSCredentialWatcher>> tlsCredWatcher_{};
std::unique_ptr<ThriftProcessor> thriftProcessor_;
std::vector<std::unique_ptr<TransportRoutingHandler>> routingHandlers_;
friend class Cpp2Connection;
friend class Cpp2Worker;
friend class rocket::ThriftRocketServerHandler;
bool tosReflect_{false};
uint32_t listenerTos_{0};
std::optional<instrumentation::ServerTracker> tracker_;
bool quickExitOnShutdownTimeout_ = false;
public:
/**
* The goal of this enum is to capture every state the server goes through in
* its lifecycle. Notice how the lifecycle is actually a cycle - after the
* server stops, it returns to its initial state of NOT_RUNNING.
*
* NOTE: For the restrictions regarding only allowing internal methods - these
* do not apply if getRejectRequestsUntilStarted() is false.
*/
enum class ServerStatus {
/**
* The server is not running. Either:
* 1. The server was never started. Or,
* 2. The server was stopped and there are outstanding requests
* were drained.
*/
NOT_RUNNING = 0,
/**
* The server is about to start and is executing
* TServerEventHandler::preStart hooks. If getRejectRequestsUntilStarted()
* is true, the server only responds to internal methods. See
* ServerConfigs::getInternalMethods.
*/
PRE_STARTING,
/**
* The preStart hooks are done executing and
* ServiceHandler::semifuture_onStartServing hooks are executing. If
* getRejectRequestsUntilStarted() is true, the server only responds to
* internal methods.
*/
STARTING,
/**
* The service is healthy and ready to handle traffic.
*/
RUNNING,
/**
* The server is preparing to stop. No new connections are accepted.
* Existing connections are unaffected.
*/
PRE_STOPPING,
/**
* The server is about to stop and
* ServiceHandler::semifuture_onStopRequested hooks are still executing.
*/
STOPPING,
/**
* ServiceHandler::semifuture_onStopRequested hooks have finished executing.
* Outstanding requests are being joined. New requests are rejected.
*/
DRAINING_UNTIL_STOPPED,
};
ServerStatus getServerStatus() const {
auto status = internalStatus_.load(std::memory_order_acquire);
if (status == ServerStatus::RUNNING && !getEnabled()) {
// Even if the server is capable of serving, the user might have
// explicitly disabled the service at startup, in which case the server
// only responds to internal methods.
return ServerStatus::STARTING;
}
return status;
}
#if FOLLY_HAS_COROUTINES
using ServiceHealth = PolledServiceHealth::ServiceHealth;
std::optional<ServiceHealth> getServiceHealth() const {
auto health = cachedServiceHealth_.load(std::memory_order_relaxed);
return health == ServiceHealth{} ? std::nullopt
: std::make_optional(health);
}
#endif
RequestHandlingCapability shouldHandleRequests() const override {
auto status = getServerStatus();
switch (status) {
case ServerStatus::RUNNING:
return RequestHandlingCapability::ALL;
case ServerStatus::NOT_RUNNING:
// The server can be in the NOT_RUNNING state and still have open
// connections, for example, if useExistingSocket is called with a
// socket that is already listening.
[[fallthrough]];
case ServerStatus::PRE_STARTING:
case ServerStatus::STARTING:
return getRejectRequestsUntilStarted()
? RequestHandlingCapability::INTERNAL_METHODS_ONLY
: RequestHandlingCapability::ALL;
case ServerStatus::PRE_STOPPING:
case ServerStatus::STOPPING:
// When the server is stopping, we close the sockets for new
// connections. However, existing connections should be unaffected.
return RequestHandlingCapability::ALL;
case ServerStatus::DRAINING_UNTIL_STOPPED:
default:
return RequestHandlingCapability::NONE;
}
}
private:
/**
* Thrift server's view of the currently running service. This status
* represents the source of truth for the status reported by the server.
*/
std::atomic<ServerStatus> internalStatus_{ServerStatus::NOT_RUNNING};
#if FOLLY_HAS_COROUTINES
/**
* Thrift server's latest view of the running service's reported health.
*/
std::atomic<ServiceHealth> cachedServiceHealth_{};
#endif
std::unique_ptr<AsyncProcessorFactory> decoratedProcessorFactory_;
/**
* Collects service handlers of the current service of a specific type.
*/
template <
typename TServiceHandler = ServiceHandlerBase,
typename = std::enable_if_t<
std::is_base_of_v<ServiceHandlerBase, TServiceHandler>>>
std::vector<TServiceHandler*> collectServiceHandlers() const {
if constexpr (std::is_same_v<TServiceHandler, ServiceHandlerBase>) {
return getDecoratedProcessorFactory().getServiceHandlers();
}
std::vector<TServiceHandler*> matchedServiceHandlers;
for (auto* serviceHandler :
getDecoratedProcessorFactory().getServiceHandlers()) {
if (auto matched = dynamic_cast<TServiceHandler*>(serviceHandler)) {
matchedServiceHandlers.push_back(matched);
}
}
return matchedServiceHandlers;
}
public:
ThriftServer();
// NOTE: Don't use this constructor to create a regular Thrift server. This
// constructor is used by the client to create a duplex server on an existing
// connection.
// Don't create a listening server. Instead use the channel to run incoming
// requests.
explicit ThriftServer(
const std::shared_ptr<HeaderServerChannel>& serverChannel);
~ThriftServer() override;
/**
* Set the thread pool used to drive the server's IO threads. Note that the
* pool's thread factory will be overridden - if you'd like to use your own,
* set it afterwards via ThriftServer::setIOThreadFactory(). If the given
* thread pool has one or more allocated threads, the number of workers will
* be set to this number. Use ThreadServer::setNumIOWorkerThreads() to set
* it afterwards if you want to change the number of works.
*
* @param the new thread pool
*/
void setIOThreadPool(
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool) {
CHECK(configMutable());
ioThreadPool_ = ioThreadPool;
if (ioThreadPool_->numThreads() > 0) {
setNumIOWorkerThreads(ioThreadPool_->numThreads());
}
}
/**
* Doing any blocking work on this executor will cause the server to
* stall servicing requests. Be careful about using this executor for anything
* other than its main purpose.
*/
std::shared_ptr<folly::IOThreadPoolExecutor> getIOThreadPool() {
return ioThreadPool_;
}
/**
* Set the thread factory that will be used to create the server's IO
* threads.
*
* @param the new thread factory
*/
void setIOThreadFactory(
std::shared_ptr<folly::NamedThreadFactory> threadFactory) {
CHECK(configMutable());
ioThreadPool_->setThreadFactory(threadFactory);
}
/**
* Add a IOObserverFactory which will be used to construct a
* ThreadPoolExecutor::Observer and attached to the ioThreadPool_ in setup().
* The same factory will be used to create an observer instance for each
* Thrift Server (if there is more than one)
*
* NOTE: Must be called before setup() in order to take effect
*/
using IOObserverFactory =
folly::Function<std::shared_ptr<folly::ThreadPoolExecutor::Observer>(
std::string, folly::WorkerProvider*) const>;
static void addIOThreadPoolObserver(IOObserverFactory factory);
/**
* Set the prefix for naming the worker threads. "Cpp2Worker" by default.
* must be called before serve() for it to take effect
*
* @param cpp2WorkerThreadName net thread name prefix
*/
void setCpp2WorkerThreadName(const std::string& cpp2WorkerThreadName) {
CHECK(configMutable());
auto factory = ioThreadPool_->getThreadFactory();
CHECK(factory);
auto namedFactory =
std::dynamic_pointer_cast<folly::NamedThreadFactory>(factory);
CHECK(namedFactory);
namedFactory->setNamePrefix(cpp2WorkerThreadName);
}
// if overloaded, returns applicable overloaded exception code.
folly::Optional<std::string> checkOverload(
const transport::THeader::StringToStringMap* readHeaders = nullptr,
const std::string* = nullptr) const final;
// returns descriptive error if application is unable to process request
PreprocessResult preprocess(
const server::PreprocessParams& params) const final;
std::string getLoadInfo(int64_t load) const override;
/*
* Use a ZeroCopyEnableFunc to decide when to use zerocopy mode
* Ex: use zerocopy when the IOBuf chain exceeds a certain thresold
* setZeroCopyEnableFunc([threshold](const std::unique_ptr<folly::IOBuf>& buf)
* { return (buf->computeChainDataLength() > threshold);});
*/
void setZeroCopyEnableFunc(folly::AsyncWriter::ZeroCopyEnableFunc func) {
zeroCopyEnableFunc_ = std::move(func);
}
const folly::AsyncWriter::ZeroCopyEnableFunc& getZeroCopyEnableFunc() const {
return zeroCopyEnableFunc_;
}
void setAcceptExecutor(std::shared_ptr<folly::IOThreadPoolExecutor> pool) {
acceptPool_ = pool;
}
/**
* Generally the acceptor should not do any work other than
* accepting connections, so use this with care.
*/
std::shared_ptr<folly::IOThreadPoolExecutor> getAcceptExecutor() {
return acceptPool_;
}
void setNumAcceptThreads(int numAcceptThreads) {
CHECK(!acceptPool_);
nAcceptors_ = numAcceptThreads;
}
/**
* Set the SSLContextConfig on the thrift server.
*/
void setSSLConfig(std::shared_ptr<wangle::SSLContextConfig> context) {
CHECK(configMutable());
if (context) {
setSSLConfig(folly::observer::makeObserver(
[context = std::move(context)]() { return *context; }));
}
updateCertsToWatch();
}
/**
* Set the SSLContextConfig on the thrift server. Note that the thrift server
* keeps an observer on the SSLContextConfig. Whenever the SSLContextConfig
* has an update, the observer callback would reset SSLContextConfig on all
* acceptors.
*/
void setSSLConfig(
folly::observer::Observer<wangle::SSLContextConfig> contextObserver) {
sslContextObserver_ = folly::observer::makeObserver(
[observer = std::move(contextObserver),
alpnObserver = ThriftServer::alpnAllowMismatch()]() {
auto context = **observer;
context.isDefault = true;
context.alpnAllowMismatch = **alpnObserver;
return context;
});
}
void setFizzConfig(wangle::FizzConfig config) { fizzConfig_ = config; }
void setThriftConfig(ThriftTlsConfig thriftConfig) {
thriftConfig_ = thriftConfig;
}
void setSSLCacheOptions(wangle::SSLCacheOptions options) {
sslCacheOptions_ = std::move(options);
}
void setTicketSeeds(wangle::TLSTicketKeySeeds seeds) { ticketSeeds_ = seeds; }
/**
* Set the ssl handshake timeout.
*/
void setSSLHandshakeTimeout(
std::optional<std::chrono::milliseconds> timeout) {
sslHandshakeTimeout_ = timeout;
}
const std::optional<std::chrono::milliseconds>& getSSLHandshakeTimeout()
const {
return sslHandshakeTimeout_;
}
/**
* Stops the Thrift server if it's idle for the given time.
*/
void setIdleServerTimeout(std::chrono::milliseconds timeout) {
idleServerTimeout_ = timeout;
}
/**
* Configures maxReadsPerEvent for accepted connections, see
* `folly::AsyncSocket::setMaxReadsPerEvent` for more details.
*/
void setSocketMaxReadsPerEvent(uint16_t socketMaxReadsPerEvent) {
socketMaxReadsPerEvent_ = socketMaxReadsPerEvent;
}
void updateTicketSeeds(wangle::TLSTicketKeySeeds seeds);
void updateTLSCert();
/**
* Tells the thrift server to update ticket seeds with the contents of the
* file ticketPath when modified and initialized the seeds with the contents
* of the file ticketPath. The seed file previously being watched will no
* longer be watched. This is not thread safe.
*/
void watchTicketPathForChanges(const std::string& ticketPath);
void setFastOpenOptions(bool enableTFO, uint32_t fastOpenQueueSize) {
enableTFO_ = enableTFO;
fastOpenQueueSize_ = fastOpenQueueSize;
}
std::optional<bool> getTFOEnabled() { return enableTFO_; }
void setReusePort(bool reusePort) { reusePort_ = reusePort; }
std::optional<bool> getReusePort() { return reusePort_; }
const std::optional<folly::observer::Observer<wangle::SSLContextConfig>>&
getSSLConfig() const {
return sslContextObserver_;
}
const std::optional<wangle::TLSTicketKeySeeds>& getTicketSeeds() const {
return ticketSeeds_;
}
const std::optional<wangle::SSLCacheOptions>& getSSLCacheOptions() const {
return sslCacheOptions_;
}
wangle::ServerSocketConfig getServerSocketConfig() {
wangle::ServerSocketConfig config;
if (sslContextObserver_.has_value()) {
config.sslContextConfigs.push_back(*sslContextObserver_->getSnapshot());
}
if (sslCacheOptions_) {
config.sslCacheOptions = *sslCacheOptions_;
}
config.connectionIdleTimeout = getIdleTimeout();
config.acceptBacklog = getListenBacklog();
if (ticketSeeds_) {
config.initialTicketSeeds = *ticketSeeds_;
}
if (enableTFO_) {
config.enableTCPFastOpen = *enableTFO_;
config.fastOpenQueueSize = fastOpenQueueSize_;
}
if (sslHandshakeTimeout_) {
config.sslHandshakeTimeout = *sslHandshakeTimeout_;
} else if (getIdleTimeout() == std::chrono::milliseconds::zero()) {
// make sure a handshake that takes too long doesn't kill the connection
config.sslHandshakeTimeout = std::chrono::milliseconds::zero();
}
// By default, we set strictSSL to false. This means the server will start
// even if cert/key is missing as it may become available later
config.strictSSL = getStrictSSL() || getSSLPolicy() == SSLPolicy::REQUIRED;
config.fizzConfig = fizzConfig_;
config.customConfigMap["thrift_tls_config"] =
std::make_shared<ThriftTlsConfig>(thriftConfig_);
config.socketMaxReadsPerEvent = socketMaxReadsPerEvent_;
config.useZeroCopy = !!zeroCopyEnableFunc_;
return config;
}
/**
* Use the provided socket rather than binding to address_. The caller must
* call ::bind on this socket, but should not call ::listen.
*
* NOTE: ThriftServer takes ownership of this 'socket' so if binding fails
* we destroy this socket, while cleaning itself up. So, 'accept' better
* work the first time :)
*/
void useExistingSocket(int socket);
void useExistingSockets(const std::vector<int>& sockets);
void useExistingSocket(folly::AsyncServerSocket::UniquePtr socket);
/**
* Return the file descriptor(s) associated with the listening socket
*/
int getListenSocket() const;
std::vector<int> getListenSockets() const;
/**
* Get the ThriftServer's main event base.
*
* @return a pointer to the EventBase.
*/
folly::EventBase* getServeEventBase() const { return serveEventBase_; }
/**
* Get the EventBaseManager used by this server. This can be used to find
* or create the EventBase associated with any given thread, including any
* new threads created by clients. This may be called from any thread.
*
* @return a pointer to the EventBaseManager.
*/
folly::EventBaseManager* getEventBaseManager();
const folly::EventBaseManager* getEventBaseManager() const {
return const_cast<ThriftServer*>(this)->getEventBaseManager();
}
SSLPolicy getSSLPolicy() const;
// Convenience method to check if SSLPolicy is explicitly set
bool isSSLPolicySet() const { return sslPolicy_.has_value(); }
void setSSLPolicy(SSLPolicy policy) { sslPolicy_ = policy; }
void setStrictSSL(bool strictSSL) { strictSSL_ = strictSSL; }
bool getStrictSSL() { return strictSSL_; }
void setAllowPlaintextOnLoopback(bool allow) {
allowPlaintextOnLoopback_ = allow;
}
bool isPlaintextAllowedOnLoopback() const {
return allowPlaintextOnLoopback_;
}
void setAllowCheckUnimplementedExtraInterfaces(bool allow) {
allowCheckUnimplementedExtraInterfaces_ = allow;
}
bool isCheckUnimplementedExtraInterfacesAllowed() const {
return allowCheckUnimplementedExtraInterfaces_;
}
static folly::observer::Observer<bool> enableStopTLS();
#if FOLLY_HAS_COROUTINES
/**
* Get CancellableAsyncScope that will be maintained by the Thrift Server.
* Cancellation is requested when the server is stopping.
* Returns nullptr, before server setup and after server stops.
*/
folly::coro::CancellableAsyncScope* getAsyncScope() {
return asyncScope_.get();
}
/**
* Get the global CancellableAsyncScope, it is usally the AsyncScope
* associated with the global server Cancellation is requested when the
* global server is stopping.
*/
static folly::coro::CancellableAsyncScope& getGlobalAsyncScope();
#endif
static void setGlobalServer(ThriftServer* server);
void setAcceptorFactory(
const std::shared_ptr<wangle::AcceptorFactory>& acceptorFactory) {
acceptorFactory_ = acceptorFactory;
}
/**
* Get the speed of adjusting connection accept rate.
*/
double getAcceptRateAdjustSpeed() const { return acceptRateAdjustSpeed_; }
/**
* Set the speed of adjusting connection accept rate.
*/
void setAcceptRateAdjustSpeed(double speed) {
CHECK(configMutable());
acceptRateAdjustSpeed_ = speed;
}
/**
* Enable/Disable TOS reflection on the server socket
*/
void setTosReflect(bool enable) { tosReflect_ = enable; }
/**
* Get TOS reflection setting for the server socket
*/
bool getTosReflect() const override { return tosReflect_; }
/**
* Set default TOS for listener/accepted connections
*/
void setListenerTos(uint32_t tos) { listenerTos_ = tos; }
/**
* Get default TOS for listener socket
*/
uint32_t getListenerTos() const override { return listenerTos_; }
/**
* Get the number of connections dropped by the AsyncServerSocket
*/
uint64_t getNumDroppedConnections() const override;
/**
* Clear all the workers.
*/
void clearWorkers() { ioThreadPool_->join(); }
/**
* Set whether to stop io workers when stopListening() is called (we do stop
* them by default).
*/
void setStopWorkersOnStopListening(bool stopWorkers) {
CHECK(configMutable());
stopWorkersOnStopListening_ = stopWorkers;
}
/**
* Get whether to stop io workers when stopListening() is called.
*/
bool getStopWorkersOnStopListening() const {
return stopWorkersOnStopListening_;
}
/**
* If stopWorkersOnStopListening is disabled, then enabling
* leakOutstandingRequestsWhenServerStops permits thriftServer->serve() to
* return before all outstanding requests are joined.
*/
void leakOutstandingRequestsWhenServerStops(bool leak) {
CHECK(configMutable());
joinRequestsWhenServerStops_ = !leak;
}
/**
* Call this to complete initialization
*/
void setup();
/**
* Create and start the default thread manager unless it already exists.
*/
void setupThreadManager();
/**
* Ensure that this Thrift Server has ResourcePools set up. If there is
* already a non-empty ResourcePoolSet, nothing will be done. Otherwise, the
* default setup of ResourcePools will be created.
*/
void ensureResourcePools();
/**
* Kill the workers and wait for listeners to quit
*/
void cleanUp();
/**
* Preferably use this method in order to start ThriftServer created for
* DuplexChannel instead of the serve() method.
*/
void startDuplex();
/**
* This method should be used to cleanly stop a ThriftServer created for
* DuplexChannel before disposing the ThriftServer. The caller should pass in
* a shared_ptr to this ThriftServer since the ThriftServer does not have a
* way of getting that (does not inherit from enable_shared_from_this)
*/
void stopDuplex(std::shared_ptr<ThriftServer> thisServer);
/**
* One stop solution:
*
* starts worker threads, enters accept loop; when
* the accept loop exits, shuts down and joins workers.
*/
void serve() override;
/**
* Call this to stop the server, if started by serve()
*
* This (asynchronously) causes the main serve() function to stop listening
* for new connections, close existing connections, shut down the worker
* threads, and then return.
*
* NOTE: that this function may not be safe to call multiple times (such as
* from a signal handler) because a previous call may initiate a sequence of
* events leading to the destruction of this object.
* Instead you should use StopController (see getStopController()) which lets
* you guard against destruction (by way of folly::PrimaryPtr).
*/
void stop() override;
using StopController = ThriftServerStopController;
private:
folly::PrimaryPtr<StopController> stopController_{
std::unique_ptr<StopController>{}};
public:
folly::PrimaryPtrRef<StopController> getStopController() {
return stopController_.ref();
}
/**
* Call this to stop listening on the server port.
*
* This causes the main serve() function to stop listening for new
* connections while still allows the worker threads to process
* existing connections. stop() still needs to be called to clear
* up the worker threads.
*/
void stopListening() override;
// client side duplex
std::shared_ptr<HeaderServerChannel> getDuplexServerChannel() {
return serverChannel_;
}
// server side duplex
bool isDuplex() { return isDuplex_; }
std::vector<std::unique_ptr<TransportRoutingHandler>> const*
getRoutingHandlers() const {
return &routingHandlers_;
}
void addRoutingHandler(
std::unique_ptr<TransportRoutingHandler> routingHandler) {
routingHandlers_.push_back(std::move(routingHandler));
}
void clearRoutingHandlers() { routingHandlers_.clear(); }
void setDuplex(bool duplex) {
// setDuplex may only be called on the server side.
// serverChannel_ must be nullptr in this case
CHECK(serverChannel_ == nullptr);
CHECK(configMutable());
isDuplex_ = duplex;
}
/**
* Returns a reference to the processor that is used by custom transports
*/
apache::thrift::ThriftProcessor* getThriftProcessor() {
return thriftProcessor_.get();
}
const std::vector<std::shared_ptr<folly::AsyncServerSocket>> getSockets()
const {
std::vector<std::shared_ptr<folly::AsyncServerSocket>> serverSockets;
for (auto& socket : ServerBootstrap::getSockets()) {
serverSockets.push_back(
std::dynamic_pointer_cast<folly::AsyncServerSocket>(socket));
}
return serverSockets;
}
/**
* Sets an explicit AsyncProcessorFactory and sets the ThriftProcessor
* to use for custom transports
*/
virtual void setProcessorFactory(
std::shared_ptr<AsyncProcessorFactory> pFac) override;
/**
* Returns an AsyncProcessorFactory that wraps the user-provided service and
* additionally handles Thrift-internal methods as well (such as the
* monitoring interface).
*
* This is the factory that all transports should use to handle requests.
*
* Logically, this is an apache::thrift::MultiplexAsyncProcessorFactory with
* the following composition:
*
* ┌────────────────────────┐
* │ User Service │
* │ (setProcessorFactory) │ │
* └────────────────────────┘ │
* │
* ┌────────────────────────┐ │
* │ Status Interface │ │
* │ (setStatusInterface) │ │
* └────────────────────────┘ │
* │ Method
* ┌────────────────────────┐ │ precedence
* │ Monitoring Interface │ │
* │(setMonitoringInterface)│ │
* └────────────────────────┘ │
* │
* ┌────────────────────────┐ │
* │ Control Interface │ ▼
* │ (setControlInterface) │
* └────────────────────────┘
*/
AsyncProcessorFactory& getDecoratedProcessorFactory() const {
CHECK(decoratedProcessorFactory_)
<< "Server must be set up before calling this method";
return *decoratedProcessorFactory_;
}
/**
* Returns an AsyncProcessor from getDecoratedProcessorFactory() without any
* application-specific event handlers installed on the underlying processors.
* This is useful, for example, in InterfaceKind::MONITORING where
* application-specific checks (such as ACL checks) should be bypassed.
*/
std::unique_ptr<AsyncProcessor> getDecoratedProcessorWithoutEventHandlers()
const;
/**
* A struct containing all "extra" internal interfaces that the service
* multiplexes behind the main user-defined interface.
*
* See ThriftServer::getDecoratedProcessorFactory.
*/
struct ExtraInterfaces {
// See ThriftServer::setMonitoringInterface.
std::shared_ptr<MonitoringServerInterface> monitoring;
// See ThriftServer::setStatusInterface.
std::shared_ptr<StatusServerInterface> status;
// See ThriftServer::setControlInterface
std::shared_ptr<ControlServerInterface> control;
};
// ThriftServer by defaults uses a global ShutdownSocketSet, so all socket's
// FDs are registered there. But in some tests you might want to simulate 2
// ThriftServer running in different processes, so their ShutdownSocketSet are
// different. In that case server should have their own SSS each so shutting
// down FD from one doesn't interfere with shutting down sockets for the
// other.
void replaceShutdownSocketSet(
const std::shared_ptr<folly::ShutdownSocketSet>& newSSS);
static folly::observer::Observer<std::list<std::string>>
defaultNextProtocols();
bool getQuickExitOnShutdownTimeout() const {
return quickExitOnShutdownTimeout_;
}
void setQuickExitOnShutdownTimeout(bool quickExitOnShutdownTimeout) {
quickExitOnShutdownTimeout_ = quickExitOnShutdownTimeout;
}
static folly::observer::Observer<bool> alpnAllowMismatch();
/**
* For each request debug stub, a snapshot information can be constructed to
* persist some transitent states about the corresponding request.
*/
class RequestSnapshot {
public:
explicit RequestSnapshot(const RequestsRegistry::DebugStub& stub)
: methodName_(stub.getMethodName()),
creationTimestamp_(stub.getTimestamp()),
finishedTimestamp_(stub.getFinished()),
protoId_(stub.getProtoId()),
peerAddress_(*stub.getPeerAddress()),
localAddress_(*stub.getLocalAddress()),
rootRequestContextId_(stub.getRootRequestContextId()),
reqId_(RequestsRegistry::getRequestId(rootRequestContextId_)),
reqDebugLog_(collectRequestDebugLog(stub)) {
auto requestPayload = rocket::unpack<RequestPayload>(stub.clonePayload());
payload_ = std::move(*requestPayload->payload);
auto& metadata = requestPayload->metadata;
if (metadata.otherMetadata()) {
headers_ = std::move(*requestPayload->metadata.otherMetadata());
}
clientId_ = metadata.clientId().to_optional();
serviceTraceMeta_ = metadata.serviceTraceMeta().to_optional();
auto req = stub.getRequest();
DCHECK(
req != nullptr || finishedTimestamp_.time_since_epoch().count() != 0);
startedProcessing_ = req == nullptr ? true : stub.getStartedProcessing();
}
const std::string& getMethodName() const { return methodName_; }
std::chrono::steady_clock::time_point getCreationTimestamp() const {
return creationTimestamp_;
}
std::chrono::steady_clock::time_point getFinishedTimestamp() const {
return finishedTimestamp_;
}
intptr_t getRootRequestContextId() const { return rootRequestContextId_; }
const std::string& getRequestId() const { return reqId_; }
bool getStartedProcessing() const { return startedProcessing_; }
/**
* Returns empty IOBuff if payload is not present.
*/
const folly::IOBuf& getPayload() const { return payload_; }
const transport::THeader::StringToStringMap& getHeaders() const {
return headers_;
}
protocol::PROTOCOL_TYPES getProtoId() const { return protoId_; }
const folly::SocketAddress& getLocalAddress() const {
return localAddress_;
}
const folly::SocketAddress& getPeerAddress() const { return peerAddress_; }
const std::vector<std::string>& getDebugLog() const { return reqDebugLog_; }
const auto& clientId() const { return clientId_; }
auto& clientId() { return clientId_; }
const auto& serviceTraceMeta() const { return serviceTraceMeta_; }
auto& serviceTraceMeta() { return serviceTraceMeta_; }
private:
const std::string methodName_;
const std::chrono::steady_clock::time_point creationTimestamp_;
const std::chrono::steady_clock::time_point finishedTimestamp_;
const protocol::PROTOCOL_TYPES protoId_;
folly::IOBuf payload_;
transport::THeader::StringToStringMap headers_;
std::optional<std::string> clientId_;
std::optional<std::string> serviceTraceMeta_;
folly::SocketAddress peerAddress_;
folly::SocketAddress localAddress_;
intptr_t rootRequestContextId_;
const std::string reqId_;
const std::vector<std::string> reqDebugLog_;
bool startedProcessing_;
};
struct ServerIOMemory {
size_t ingress;
size_t egress;
};
/**
* Returns structure highlighting the ingress and egress memory usage in
* thrift server
*/
folly::SemiFuture<ServerIOMemory> getUsedIOMemory();
struct ConnectionSnapshot {
size_t numActiveRequests{0};
size_t numPendingWrites{0};
std::chrono::steady_clock::time_point creationTime;
};
using RequestSnapshots = std::vector<RequestSnapshot>;
using ConnectionSnapshots =
std::unordered_map<folly::SocketAddress, ConnectionSnapshot>;
struct ServerSnapshot {
RecentRequestCounter::Values recentCounters;
RequestSnapshots requests;
ConnectionSnapshots connections;
ServerIOMemory memory;
};
struct SnapshotOptions {
std::chrono::microseconds connectionsAgeMax;
};
folly::SemiFuture<ServerSnapshot> getServerSnapshot() {
return getServerSnapshot(SnapshotOptions{});
}
folly::SemiFuture<ServerSnapshot> getServerSnapshot(
const SnapshotOptions& options);
/**
* If shutdown does not complete within the configured worker join timeout,
* then we schedule a task to dump the server's state to disk for
* investigation.
*
* The implementor of the dumping logic should provide the the task as well
* as an appropriate timeout -- we do not want to indefinitely block shutdown
* in case the task deadlocks.
*/
struct DumpSnapshotOnLongShutdownResult {
folly::SemiFuture<folly::Unit> task;
std::chrono::milliseconds timeout;
};
enum class UnimplementedExtraInterfacesResult {
/**
* The method is completely unrecognized by the service.
*/
UNRECOGNIZED,
/**
* Extra interfaces are implemented directly by the service.
*/
IMPLEMENTED,
/**
* Extra interfaces are left unimplemented but recognized by the service.
*/
UNIMPLEMENTED,
};
};
template <typename AcceptorClass, typename SharedSSLContextManagerClass>
class ThriftAcceptorFactory : public wangle::AcceptorFactorySharedSSLContext {
public:
ThriftAcceptorFactory(ThriftServer* server) : server_(server) {}
std::shared_ptr<wangle::SharedSSLContextManager>
initSharedSSLContextManager() {
if constexpr (!std::is_same<SharedSSLContextManagerClass, void>::value) {
sharedSSLContextManager_ = std::make_shared<SharedSSLContextManagerClass>(
server_->getServerSocketConfig());
}
return sharedSSLContextManager_;
}
std::shared_ptr<wangle::Acceptor> newAcceptor(folly::EventBase* eventBase) {
if (!sharedSSLContextManager_) {
return AcceptorClass::create(server_, nullptr, eventBase);
}
auto acceptor = AcceptorClass::create(
server_,
nullptr,
eventBase,
sharedSSLContextManager_->getCertManager(),
sharedSSLContextManager_->getContextManager(),
sharedSSLContextManager_->getFizzContext());
sharedSSLContextManager_->addAcceptor(acceptor);
return acceptor;
}
protected:
ThriftServer* server_;
};
using DefaultThriftAcceptorFactory = ThriftAcceptorFactory<Cpp2Worker, void>;
using DefaultThriftAcceptorFactorySharedSSLContext = ThriftAcceptorFactory<
Cpp2Worker,
wangle::SharedSSLContextManagerImpl<wangle::FizzConfigUtil>>;
namespace detail {
THRIFT_PLUGGABLE_FUNC_DECLARE(
apache::thrift::ThriftServer::DumpSnapshotOnLongShutdownResult,
dumpSnapshotOnLongShutdown);
THRIFT_PLUGGABLE_FUNC_DECLARE(
apache::thrift::ThriftServer::ExtraInterfaces,
createDefaultExtraInterfaces);
THRIFT_PLUGGABLE_FUNC_DECLARE(
ThriftServer::UnimplementedExtraInterfacesResult,
serviceHasUnimplementedExtraInterfaces,
AsyncProcessorFactory& service);
} // namespace detail
} // namespace thrift
} // namespace apache
#endif // #ifndef THRIFT_SERVER_H_