thrift/lib/cpp2/server/BaseThriftServer.h (797 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <map>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <vector>
#include <folly/Memory.h>
#include <folly/Portability.h>
#include <folly/SocketAddress.h>
#include <folly/Synchronized.h>
#include <folly/io/SocketOptionMap.h>
#include <folly/io/async/AsyncTransport.h>
#include <folly/io/async/EventBase.h>
#include <thrift/lib/cpp/concurrency/Thread.h>
#include <thrift/lib/cpp/concurrency/ThreadManager.h>
#include <thrift/lib/cpp/server/TServerEventHandler.h>
#include <thrift/lib/cpp/server/TServerObserver.h>
#include <thrift/lib/cpp/transport/THeader.h>
#include <thrift/lib/cpp2/Flags.h>
#include <thrift/lib/cpp2/Thrift.h>
#include <thrift/lib/cpp2/async/AsyncProcessor.h>
#include <thrift/lib/cpp2/server/AdaptiveConcurrency.h>
#include <thrift/lib/cpp2/server/ControlServerInterface.h>
#include <thrift/lib/cpp2/server/MonitoringServerInterface.h>
#include <thrift/lib/cpp2/server/ResourcePool.h>
#include <thrift/lib/cpp2/server/ServerAttribute.h>
#include <thrift/lib/cpp2/server/ServerConfigs.h>
#include <thrift/lib/cpp2/server/ServerFlags.h>
#include <thrift/lib/cpp2/server/StatusServerInterface.h>
THRIFT_FLAG_DECLARE_int64(server_default_socket_queue_timeout_ms);
THRIFT_FLAG_DECLARE_int64(server_default_queue_timeout_ms);
THRIFT_FLAG_DECLARE_int64(server_polled_service_health_liveness_ms);
THRIFT_FLAG_DECLARE_int64(
server_ingress_memory_limit_enforcement_payload_size_min_bytes);
THRIFT_FLAG_DECLARE_bool(server_reject_header_connections);
namespace wangle {
class ConnectionManager;
}
namespace apache {
namespace thrift {
typedef std::function<void(
folly::EventBase*,
wangle::ConnectionManager*,
std::shared_ptr<folly::AsyncTransport>,
std::unique_ptr<folly::IOBuf>)>
getHandlerFunc;
typedef std::function<void(
const apache::thrift::transport::THeader*, const folly::SocketAddress*)>
GetHeaderHandlerFunc;
using IsOverloadedFunc = folly::Function<bool(
const transport::THeader::StringToStringMap*, const std::string*) const>;
using PreprocessFunc =
folly::Function<PreprocessResult(const server::PreprocessParams&) const>;
template <typename T>
class ThriftServerAsyncProcessorFactory : public AsyncProcessorFactory {
public:
explicit ThriftServerAsyncProcessorFactory(std::shared_ptr<T> t) {
svIf_ = t;
}
std::unique_ptr<apache::thrift::AsyncProcessor> getProcessor() override {
return std::unique_ptr<apache::thrift::AsyncProcessor>(
new typename T::ProcessorType(svIf_.get()));
}
std::vector<ServiceHandlerBase*> getServiceHandlers() override {
return {svIf_.get()};
}
std::optional<std::reference_wrapper<ServiceRequestInfoMap const>>
getServiceRequestInfoMap() const override {
return svIf_->getServiceRequestInfoMap();
}
private:
std::shared_ptr<T> svIf_;
};
/**
* Base class for thrift servers using cpp2 style generated code.
*/
class BaseThriftServer : public apache::thrift::concurrency::Runnable,
public apache::thrift::server::ServerConfigs {
public:
using AllocIOBufFn = std::unique_ptr<folly::IOBuf>(size_t);
struct FailureInjection {
FailureInjection()
: errorFraction(0), dropFraction(0), disconnectFraction(0) {}
// Cause a fraction of requests to fail
float errorFraction;
// Cause a fraction of requests to be dropped (and presumably time out
// on the client)
float dropFraction;
// Cause a fraction of requests to cause the channel to be disconnected,
// possibly failing other requests as well.
float disconnectFraction;
bool operator==(const FailureInjection& other) const {
return errorFraction == other.errorFraction &&
dropFraction == other.dropFraction &&
disconnectFraction == other.disconnectFraction;
}
bool operator!=(const FailureInjection& other) const {
return !(*this == other);
}
};
struct Metadata {
std::string configPath;
std::optional<std::string> serviceFramework;
std::optional<std::string> wrapper;
std::optional<std::string> languageFramework;
std::optional<std::set<std::string>> modules;
std::optional<std::string> tlsConfigSource;
void addModule(std::string_view name) {
if (!modules) {
modules.emplace();
}
modules->emplace(name);
}
};
/**
* Tag type for ServerAttributeStatic setters. Setters marked with this tag
* type should only be called before the server has started processing
* requests.
*/
struct StaticAttributeTag {};
/**
* Tag type for ServerAttributeDynamic setters. Setters marked with this tag
* type can be called even after the server has started processing requests.
* The corresponding value will be dynamically updated.
*/
struct DynamicAttributeTag {};
/**
* The type of thread manager to create for the server.
*/
enum class ThreadManagerType : int {
PRIORITY = 0, //! Use a PriorityThreadManager
SIMPLE = 1 //! Use a SimpleThreadManager
};
private:
//! Default number of worker threads (should be # of processor cores).
static const size_t T_ASYNC_DEFAULT_WORKER_THREADS;
static constexpr uint32_t T_MAX_NUM_PENDING_CONNECTIONS_PER_WORKER = 4096;
static constexpr std::chrono::milliseconds DEFAULT_TIMEOUT =
std::chrono::milliseconds(60000);
static constexpr std::chrono::milliseconds DEFAULT_TASK_EXPIRE_TIME =
std::chrono::milliseconds(5000);
static constexpr std::chrono::milliseconds DEFAULT_STREAM_EXPIRE_TIME =
std::chrono::milliseconds(60000);
static constexpr std::chrono::milliseconds DEFAULT_SOCKET_WRITE_TIMEOUT =
std::chrono::milliseconds(60000);
static constexpr std::chrono::seconds DEFAULT_WORKERS_JOIN_TIMEOUT =
std::chrono::seconds(30);
/// Listen backlog
static constexpr int DEFAULT_LISTEN_BACKLOG = 1024;
//! Prefix for pool thread names
ServerAttributeStatic<std::string> poolThreadName_{""};
// Cpp2 ProcessorFactory.
std::shared_ptr<apache::thrift::AsyncProcessorFactory> cpp2Pfac_;
// Explicitly set monitoring service interface handler
std::shared_ptr<MonitoringServerInterface> monitoringServiceHandler_;
// Explicitly set status service interface handler
std::shared_ptr<StatusServerInterface> statusServiceHandler_;
// Explicitly set control service interface handler
std::shared_ptr<ControlServerInterface> controlServiceHandler_;
//! Number of io worker threads (may be set) (should be # of CPU cores)
ServerAttributeStatic<size_t> nWorkers_{T_ASYNC_DEFAULT_WORKER_THREADS};
// Timeout for joining worker threads
ServerAttributeStatic<std::chrono::seconds> workersJoinTimeout_{
DEFAULT_WORKERS_JOIN_TIMEOUT};
//! Number of CPU worker threads
ServerAttributeStatic<size_t> nPoolThreads_{T_ASYNC_DEFAULT_WORKER_THREADS};
ServerAttributeDynamic<bool> enableCodel_{false};
//! Milliseconds we'll wait for data to appear (0 = infinity)
ServerAttributeStatic<std::chrono::milliseconds> timeout_{DEFAULT_TIMEOUT};
/**
* The time in milliseconds before an unperformed task expires
* (0 == infinite)
*/
ServerAttributeDynamic<std::chrono::milliseconds> taskExpireTime_{
DEFAULT_TASK_EXPIRE_TIME};
/**
* The time in milliseconds before a stream starves of having no request.
* (0 == infinite)
*/
ServerAttributeDynamic<std::chrono::milliseconds> streamExpireTime_{
DEFAULT_STREAM_EXPIRE_TIME};
/**
* The time we'll allow a task to wait on the queue and still perform it
* (0 == infinite)
*/
ServerAttributeDynamic<std::chrono::milliseconds> queueTimeout_{
folly::observer::makeValueObserver(
[timeoutMs = THRIFT_FLAG_OBSERVE(server_default_queue_timeout_ms)]() {
// Timeouts should be disabled for debug builds since this can
// generate false negatives in unit tests.
if (folly::kIsDebug) {
return std::chrono::milliseconds(0);
}
return std::chrono::milliseconds(**timeoutMs);
})};
/**
* The time we'll allow a new connection socket to wait on the queue before
* closing the connection. See `folly::AsyncServerSocket::setQueueTimeout`.
*/
ServerAttributeDynamic<std::chrono::nanoseconds> socketQueueTimeout_{
folly::observer::makeObserver(
[timeoutMs =
THRIFT_FLAG_OBSERVE(server_default_socket_queue_timeout_ms)]()
-> std::chrono::nanoseconds {
// Disable timeout for debug builds and unit tests by default - this
// is a production overload protection feature.
if (folly::kIsDebug) {
return std::chrono::milliseconds::zero();
}
return std::chrono::milliseconds(**timeoutMs);
})};
/**
* How long a socket with outbound data will tolerate read inactivity from a
* client. Clients must read data from their end of the connection before this
* period expires or the server will drop the connection. The amount of data
* read is irrelevant. Zero indicates no timeout.
*/
ServerAttributeDynamic<std::chrono::milliseconds> socketWriteTimeout_{
DEFAULT_SOCKET_WRITE_TIMEOUT};
/**
* The number of incoming connections the TCP stack will buffer up while
* waiting for the Thrift server to call accept() on them.
*
* If the Thrift server cannot keep up, and this limit is reached, the
* TCP stack will start sending resets to drop excess connections.
*
* Actual behavior of the socket backlog is dependent on the TCP
* implementation, and it may be further limited or even ignored on some
* systems. See manpage for listen(2) for details.
*/
ServerAttributeStatic<int> listenBacklog_{DEFAULT_LISTEN_BACKLOG};
/**
* The maximum number of pending connections each io worker thread can hold.
*/
ServerAttributeStatic<uint32_t> maxNumPendingConnectionsPerWorker_{
T_MAX_NUM_PENDING_CONNECTIONS_PER_WORKER};
// Max number of active connections
ServerAttributeDynamic<uint32_t> maxConnections_{0};
// Max active requests
ServerAttributeDynamic<uint32_t> maxRequests_{
concurrency::ThreadManager::DEFAULT_MAX_QUEUE_SIZE};
// If it is set true, server will check and use client timeout header
ServerAttributeDynamic<bool> useClientTimeout_{true};
// Max response size allowed. This is the size of the serialized and
// transformed response, headers not included. 0 (default) means no limit.
ServerAttributeDynamic<uint64_t> maxResponseSize_{0};
/**
* The maximum memory usage (in bytes) by each request debug payload.
* Payloads larger than this value will be simply dropped by instrumentation.
*/
ServerAttributeStatic<uint64_t> maxDebugPayloadMemoryPerRequest_{
0x1000000}; // 16MB
/**
* The maximum memory usage by each worker to keep track of debug payload.
* Each time a request payload is added for tracking, the tracker should check
* whether it's using memory beyond this value and evict payloads based on
* its policies.
*/
ServerAttributeStatic<uint64_t> maxDebugPayloadMemoryPerWorker_{
0x1000000}; // 16MB
/**
* The maximum number of debug payloads to track after request has finished.
*/
ServerAttributeStatic<uint16_t> maxFinishedDebugPayloadsPerWorker_{10};
/**
* Batch all writes withing given time interval.
* (0 == disabled)
*/
ServerAttributeDynamic<std::chrono::milliseconds> writeBatchingInterval_{
std::chrono::milliseconds::zero()};
/**
* Trigger early flush when this number of writes are queued.
* Ignored if write batching interval is not set.
* (0 == disabled)
*/
ServerAttributeDynamic<size_t> writeBatchingSize_{0};
/**
* Trigger early flush when the total number of bytes queued equals or exceeds
* this value. Ignored if write batching interval is not set. (0 == disabled)
*/
ServerAttributeDynamic<size_t> writeBatchingByteSize_{0};
ServerAttributeStatic<folly::sorted_vector_set<std::string>>
methodsBypassMaxRequestsLimit_{{}};
Metadata metadata_;
ServerAttributeDynamic<size_t> ingressMemoryLimit_{0};
ServerAttributeDynamic<size_t> egressMemoryLimit_{0};
ServerAttributeDynamic<size_t> minPayloadSizeToEnforceIngressMemoryLimit_{
folly::observer::makeObserver(
[o = THRIFT_FLAG_OBSERVE(
server_ingress_memory_limit_enforcement_payload_size_min_bytes)]()
-> size_t { return **o < 0 ? 0ul : static_cast<size_t>(**o); })};
/**
* Per-connection threshold for number of allocated bytes allowed in egress
* buffer before applying backpressure by pausing streams.
* (0 == disabled)
*/
ServerAttributeDynamic<size_t> egressBufferBackpressureThreshold_{0};
/**
* Factor of egress buffer backpressure threshold at which to resume streams.
* Should be set well below 1 to avoid rapidly turning backpressure on/off.
* Ignored if backpressure threshold is disabled.
*/
ServerAttributeDynamic<double> egressBufferRecoveryFactor_{0.75};
/**
* The duration of time that a polled ServiceHealth value is considered
* current. i.e. another poll will only be scheduled after this amount of
* time has passed since the last poll completed.
*
* @see apache::thrift::PolledServiceHealth
*/
ServerAttributeDynamic<std::chrono::milliseconds>
polledServiceHealthLiveness_{folly::observer::makeObserver(
[livenessMs = THRIFT_FLAG_OBSERVE(
server_polled_service_health_liveness_ms)]() {
return std::chrono::milliseconds(**livenessMs);
})};
/**
* Enable to reject all header-backed connections
*/
ServerAttributeDynamic<bool> disableHeaderTransport_{
THRIFT_FLAG_OBSERVE(server_reject_header_connections)};
/**
* Socket options that will be applied to every connection to clients.
* If the socket does not support the specific option, it is silently ignored.
* Refer to setsockopt() for more details.
*/
ServerAttributeDynamic<folly::SocketOptionMap> perConnectionSocketOptions_{
folly::emptySocketOptionMap};
std::shared_ptr<server::TServerEventHandler> eventHandler_;
std::vector<std::shared_ptr<server::TServerEventHandler>> eventHandlers_;
AdaptiveConcurrencyController adaptiveConcurrencyController_;
bool usingCustomThreadManager_{false};
protected:
//! The server's listening addresses
std::vector<folly::SocketAddress> addresses_;
//! The server's listening port
uint16_t port_ = 0;
//! The type of thread manager to create.
ThreadManagerType threadManagerType_{ThreadManagerType::PRIORITY};
//! The thread pool sizes for priority thread manager.
std::array<size_t, concurrency::N_PRIORITIES> threadManagerPoolSizes_{
{0, 0, 0, 0, 0}};
//! The ResourcePoolsSet used by this ThriftServer (if in ResourcePools
//! are enabled).
ResourcePoolSet resourcePoolSet_;
/**
* The thread manager used for sync calls.
*/
mutable std::mutex threadManagerMutex_;
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager_;
// If set, the thread factory that should be used to create worker threads.
std::shared_ptr<concurrency::ThreadFactory> threadFactory_;
// Notification of various server events. Note that once observer_ has been
// set, it cannot be set again and will remain alive for (at least) the
// lifetime of *this.
folly::Synchronized<std::shared_ptr<server::TServerObserver>> observer_;
std::atomic<server::TServerObserver*> observerPtr_{nullptr};
IsOverloadedFunc isOverloaded_;
PreprocessFunc preprocess_;
std::function<int64_t(const std::string&)> getLoad_;
enum class InjectedFailure { NONE, ERROR, DROP, DISCONNECT };
class CumulativeFailureInjection {
public:
CumulativeFailureInjection()
: empty_(true),
errorThreshold_(0),
dropThreshold_(0),
disconnectThreshold_(0) {}
InjectedFailure test() const;
void set(const FailureInjection& fi);
private:
std::atomic<bool> empty_;
mutable std::mutex mutex_;
float errorThreshold_;
float dropThreshold_;
float disconnectThreshold_;
};
// Unlike FailureInjection, this is cumulative and thread-safe
CumulativeFailureInjection failureInjection_;
InjectedFailure maybeInjectFailure() const {
return failureInjection_.test();
}
// This is meant to be used internally
// We separate setThreadManager and configureThreadManager
// so that we can have proper logging for the former
// These APIs will be deprecated eventually when ResourcePool
// migration is done.
void setThreadManagerInternal(
std::shared_ptr<apache::thrift::concurrency::ThreadManager>
threadManager) {
CHECK(configMutable());
std::lock_guard<std::mutex> lock(threadManagerMutex_);
threadManager_ = threadManager;
}
getHandlerFunc getHandler_;
GetHeaderHandlerFunc getHeaderHandler_;
ClientIdentityHook clientIdentityHook_;
// Flag indicating whether it is safe to mutate the server config through its
// setters.
std::atomic<bool> configMutable_{true};
folly::Function<AllocIOBufFn> allocIOBufFn_;
template <typename T>
void setStaticAttribute(
ServerAttributeStatic<T>& staticAttribute,
T&& value,
AttributeSource source) {
CHECK(configMutable());
staticAttribute.set(std::move(value), source);
}
BaseThriftServer();
~BaseThriftServer() override {}
public:
folly::Function<AllocIOBufFn>& getAllocIOBufFn() { return allocIOBufFn_; }
void setAllocIOBufFn(folly::Function<AllocIOBufFn>&& fn) {
allocIOBufFn_ = std::move(fn);
}
std::shared_ptr<server::TServerEventHandler> getEventHandler() const {
return eventHandler_;
}
/**
* If a view of the event handlers is needed that does not need to extend
* their lifetime beyond that of the BaseThriftServer, this method allows
* obtaining the raw pointer rather than the more expensive shared_ptr. Since
* unsynchronized setServerEventHandler / addServerEventHandler /
* getEventHandler calls are not permitted, use cases that get the handler,
* inform it of some action, and then discard the handle immediately can use
* getEventHandlersUnsafe.
*/
const std::vector<std::shared_ptr<server::TServerEventHandler>>&
getEventHandlersUnsafe() const {
return eventHandlers_;
}
/**
* DEPRECATED! Please use addServerEventHandler instead.
*/
void setServerEventHandler(
std::shared_ptr<server::TServerEventHandler> eventHandler) {
if (eventHandler_) {
eventHandlers_.erase(std::find(
eventHandlers_.begin(), eventHandlers_.end(), eventHandler_));
}
eventHandler_ = std::move(eventHandler);
if (eventHandler_) {
eventHandlers_.push_back(eventHandler_);
}
}
void addServerEventHandler(
std::shared_ptr<server::TServerEventHandler> eventHandler) {
eventHandlers_.push_back(eventHandler);
}
/**
* Indicate whether it is safe to modify the server config through setters.
* This roughly corresponds to whether the IO thread pool could be servicing
* requests.
*
* @return true if the configuration can be modified, false otherwise
*/
bool configMutable() { return configMutable_; }
/**
* Set the ThreadFactory that will be used to create worker threads for the
* service. If not set, a default factory will be used. Must be called
* before the thread manager is started.
*/
void setThreadFactory(
std::shared_ptr<concurrency::ThreadFactory> threadFactory) {
CHECK(configMutable());
std::lock_guard<std::mutex> lock(threadManagerMutex_);
CHECK(!threadManager_);
threadFactory_ = std::move(threadFactory);
}
/**
* Set the type of ThreadManager to use for this server.
*/
void setThreadManagerType(ThreadManagerType threadManagerType) {
CHECK(configMutable());
std::lock_guard<std::mutex> lock(threadManagerMutex_);
CHECK(!threadManager_);
threadManagerType_ = threadManagerType;
}
/**
* Set the size of thread pools when using ThreadManagerType::PRIORITY
*/
void setThreadManagerPoolSizes(
const std::array<size_t, concurrency::N_PRIORITIES>& poolSizes) {
CHECK(configMutable());
std::lock_guard<std::mutex> lock(threadManagerMutex_);
CHECK(!threadManager_);
threadManagerPoolSizes_ = poolSizes;
}
/**
* Get the prefix for naming the CPU (pool) threads.
*
* @return current setting.
*/
std::string getCPUWorkerThreadName() const { return poolThreadName_.get(); }
/**
* Set the prefix for naming the CPU (pool) threads. Not set by default.
* must be called before serve() for it to take effect
* ignored if setThreadManager() is called.
*
* @param cpuWorkerThreadName thread name prefix
*/
void setCPUWorkerThreadName(
const std::string& cpuWorkerThreadName,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(
poolThreadName_, std::string{cpuWorkerThreadName}, source);
}
/**
* Set Thread Manager (for queuing mode).
* If not set, defaults to the number of worker threads.
* This is meant to be used as an external API
*
* @param threadManager a shared pointer to the thread manager
*/
void setThreadManager(
std::shared_ptr<apache::thrift::concurrency::ThreadManager>
threadManager) {
setThreadManagerInternal(threadManager);
runtimeDisableResourcePools();
usingCustomThreadManager_ = true;
}
/**
* Set Thread Manager by using the executor (for Python server).
*
* @param executor folly::Executor to be set as the threadManager
*/
void setThreadManagerFromExecutor(
folly::Executor* executor, std::string name = "") {
concurrency::ThreadManagerExecutorAdapter::Options opts(std::move(name));
setThreadManagerInternal(
std::make_shared<concurrency::ThreadManagerExecutorAdapter>(
folly::getKeepAliveToken(executor), std::move(opts)));
runtimeDisableResourcePools();
usingCustomThreadManager_ = true;
}
/**
* Get Thread Manager (for queuing mode).
*
* @return a shared pointer to the thread manager
*/
std::shared_ptr<concurrency::ThreadManager> getThreadManager() const {
std::lock_guard<std::mutex> lock(threadManagerMutex_);
return threadManager_;
}
/**
* Get the executor for the general pool of async CPUWorkerThreads.
*
* @return a pointer to the executor
*/
folly::Executor* getExecutor() const {
if (useResourcePoolsFlagsSet()) {
return resourcePoolSet()
.resourcePool(ResourcePoolHandle::defaultAsync())
.executor()
.value();
}
std::lock_guard<std::mutex> lock(threadManagerMutex_);
return threadManager_.get();
}
/**
* Get the maximum # of connections allowed before overload.
*
* @return current setting.
*/
uint32_t getMaxConnections() const { return maxConnections_.get(); }
/**
* Set the maximum # of connections allowed before overload.
*
* @param maxConnections new setting for maximum # of connections.
*/
void setMaxConnections(
uint32_t maxConnections,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
maxConnections_.set(maxConnections, source);
}
/**
* Sets the timeout for joining workers
* @param timeout new setting for timeout for joining requests.
*/
void setWorkersJoinTimeout(
std::chrono::seconds timeout,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(workersJoinTimeout_, std::move(timeout), source);
}
/**
* Get the timeout for joining workers.
* @return workers joing timeout in seconds
*/
std::chrono::seconds getWorkersJoinTimeout() const {
return workersJoinTimeout_.get();
}
/**
* Get the maximum # of requests being processed in handler before overload.
*
* @return current setting.
*/
uint32_t getMaxRequests() const {
return adaptiveConcurrencyController_.enabled()
? static_cast<uint32_t>(adaptiveConcurrencyController_.getMaxRequests())
: maxRequests_.get();
}
/**
* Set the maximum # of requests being processed in handler before overload.
*
* @param maxRequests new setting for maximum # of active requests.
*/
void setMaxRequests(
uint32_t maxRequests,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
maxRequests_.set(maxRequests, source);
// Eventually we'll remove the simple setMaxRequests but for now ensure it
// updates the concurrency controller for the default async pool.
if (!resourcePoolSet_.empty()) {
if (resourcePoolSet_.hasResourcePool(
ResourcePoolHandle::defaultAsync())) {
resourcePoolSet_.resourcePool(ResourcePoolHandle::defaultAsync())
.concurrencyController()
.value()
->setExecutionLimitRequests(maxRequests);
}
}
}
uint64_t getMaxResponseSize() const final { return maxResponseSize_.get(); }
void setMaxResponseSize(
uint64_t size,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
maxResponseSize_.set(size, source);
}
bool getUseClientTimeout() const { return useClientTimeout_.get(); }
void setUseClientTimeout(
bool useClientTimeout,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
useClientTimeout_.set(useClientTimeout, source);
}
// Get load of the server.
int64_t getLoad(
const std::string& counter = "", bool check_custom = true) const final;
virtual std::string getLoadInfo(int64_t load) const;
void setObserver(const std::shared_ptr<server::TServerObserver>& observer) {
auto locked = observer_.wlock();
if (*locked) {
throw std::logic_error("Server already has an observer installed");
}
*locked = observer;
observerPtr_.store(locked->get());
}
server::TServerObserver* getObserver() const final {
return observerPtr_.load(std::memory_order_relaxed);
}
AdaptiveConcurrencyController& getAdaptiveConcurrencyController() final {
return adaptiveConcurrencyController_;
}
const AdaptiveConcurrencyController& getAdaptiveConcurrencyController()
const final {
return adaptiveConcurrencyController_;
}
std::shared_ptr<server::TServerObserver> getObserverShared() const {
return observer_.copy();
}
/**
* Set the address(es) to listen on.
*/
void setAddress(const folly::SocketAddress& address) {
setAddresses({address});
}
void setAddress(folly::SocketAddress&& address) {
setAddresses({std::move(address)});
}
void setAddress(const char* ip, uint16_t port) {
setAddresses({folly::SocketAddress(ip, port)});
}
void setAddress(const std::string& ip, uint16_t port) {
setAddresses({folly::SocketAddress(ip, port)});
}
void setAddresses(std::vector<folly::SocketAddress> addresses) {
CHECK(!addresses.empty());
CHECK(configMutable());
port_ = 0;
addresses_ = std::move(addresses);
}
/**
* Get the address the server is listening on.
*
* This should generally only be called after setup() has finished.
*
* (The address may be uninitialized until setup() has run. If called from
* another thread besides the main server thread, the caller is responsible
* for providing their own synchronization to ensure that setup() is not
* modifying the address while they are using it.)
*/
const folly::SocketAddress& getAddress() const { return addresses_.at(0); }
const std::vector<folly::SocketAddress>& getAddresses() const {
return addresses_;
}
/**
* Set the port to listen on.
*/
void setPort(uint16_t port) {
CHECK(configMutable());
port_ = port;
addresses_.at(0).reset();
}
/**
* Get the port.
*/
uint16_t getPort() {
if (!getAddress().isInitialized()) {
return port_;
}
return getAddress().getPort();
}
/**
* Get the maximum number of pending connections each io worker thread can
* hold.
*/
uint32_t getMaxNumPendingConnectionsPerWorker() const {
return maxNumPendingConnectionsPerWorker_.get();
}
/**
* Set the maximum number of pending connections each io worker thread can
* hold. No new connections will be sent to that io worker thread if there
* are more than such number of unprocessed connections in that queue. If
* every io worker thread's queue is full the connection will be dropped.
*/
void setMaxNumPendingConnectionsPerWorker(
uint32_t num,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(
maxNumPendingConnectionsPerWorker_, std::move(num), source);
}
/**
* Get the number of connections dropped by the AsyncServerSocket
*/
virtual uint64_t getNumDroppedConnections() const = 0;
/** Get maximum number of milliseconds we'll wait for data (0 = infinity).
*
* @return number of milliseconds, or 0 if no timeout set.
*/
std::chrono::milliseconds getIdleTimeout() const { return timeout_.get(); }
/** Set maximum number of milliseconds we'll wait for data (0 = infinity).
* Note: existing connections are unaffected by this call.
*
* @param timeout number of milliseconds, or 0 to disable timeouts.
*/
void setIdleTimeout(
std::chrono::milliseconds timeout,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(timeout_, std::move(timeout), source);
}
/**
* Set the number of IO worker threads
*
* @param number of IO worker threads
*/
void setNumIOWorkerThreads(
size_t numIOWorkerThreads,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(nWorkers_, std::move(numIOWorkerThreads), source);
}
/**
* Get the number of IO worker threads
*
* @return number of IO worker threads
*/
size_t getNumIOWorkerThreads() const final { return nWorkers_.get(); }
/**
* Set the number of CPU (pool) threads.
* Only valid if you do not also set a threadmanager. This controls the number
* of normal priority threads; the Thrift thread manager can create additional
* threads for other priorities.
* If set to 0, the number of normal priority threads will be the same as
* number of CPU cores.
*
* @param number of CPU (pool) threads
*/
void setNumCPUWorkerThreads(
size_t numCPUWorkerThreads,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
CHECK(!threadManager_);
setStaticAttribute(nPoolThreads_, std::move(numCPUWorkerThreads), source);
}
/**
* Get the number of CPU (pool) threads
*
* @return number of CPU (pool) threads
*/
size_t getNumCPUWorkerThreads() const {
auto nCPUWorkers = nPoolThreads_.get();
return nCPUWorkers ? nCPUWorkers : T_ASYNC_DEFAULT_WORKER_THREADS;
}
/**
* Codel queuing timeout - limit queueing time before overload
* http://en.wikipedia.org/wiki/CoDel
*/
void setEnableCodel(
bool enableCodel,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
enableCodel_.set(enableCodel, source);
}
bool getEnableCodel() const { return enableCodel_.get(); }
/**
* Sets the main server interface that exposes user-defined methods.
*/
void setInterface(std::shared_ptr<AsyncProcessorFactory> iface) {
setProcessorFactory(std::move(iface));
}
/**
* DEPRECATED! Use setInterface instead.
*/
virtual void setProcessorFactory(
std::shared_ptr<AsyncProcessorFactory> pFac) {
CHECK(configMutable());
cpp2Pfac_ = pFac;
}
const std::shared_ptr<apache::thrift::AsyncProcessorFactory>&
getProcessorFactory() const {
return cpp2Pfac_;
}
/**
* Sets the interface that will be used for monitoring connections only.
*/
void setMonitoringInterface(
std::shared_ptr<MonitoringServerInterface> iface) {
CHECK(configMutable());
monitoringServiceHandler_ = std::move(iface);
}
const std::shared_ptr<MonitoringServerInterface>& getMonitoringInterface()
const {
return monitoringServiceHandler_;
}
/**
* Sets the interface that will be used for status RPCs only.
*/
void setStatusInterface(std::shared_ptr<StatusServerInterface> iface) {
CHECK(configMutable());
statusServiceHandler_ = std::move(iface);
}
const std::shared_ptr<StatusServerInterface>& getStatusInterface() {
return statusServiceHandler_;
}
/**
* Sets the interface that will be used for control RPCs only.
*/
void setControlInterface(std::shared_ptr<ControlServerInterface> iface) {
CHECK(configMutable());
controlServiceHandler_ = std::move(iface);
}
const std::shared_ptr<ControlServerInterface>& getControlInterface() const {
return controlServiceHandler_;
}
/**
* Set the task expire time
*
*/
void setTaskExpireTime(
std::chrono::milliseconds timeout,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
taskExpireTime_.set(timeout, source);
}
/**
* Get the task expire time
*
* @return task expire time
*/
std::chrono::milliseconds getTaskExpireTime() const {
return taskExpireTime_.get();
}
/**
* Set the stream starvation time
*
*/
void setStreamExpireTime(
std::chrono::milliseconds timeout,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
streamExpireTime_.set(timeout, source);
}
/**
* If there is no request for the stream for the given time period, then the
* stream will create timeout error.
*/
std::chrono::milliseconds getStreamExpireTime() const final {
return streamExpireTime_.get();
}
/**
* Set the time requests are allowed to stay on the queue.
* Note, queuing is an indication that your server cannot keep
* up with load, and realtime systems should not queue. Only
* override this if you do heavily batched requests.
*/
void setQueueTimeout(
std::chrono::milliseconds timeout,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
queueTimeout_.set(timeout, source);
}
/**
* Get the time requests are allowed to stay on the queue
*
* @return queue timeout
*/
std::chrono::milliseconds getQueueTimeout() const {
return queueTimeout_.get();
}
/**
* Sets the duration before which new connections waiting on a socket's queue
* are closed. A value of 0 represents an infinite duration.
* See `folly::AsyncServerSocket::setQueueTimeout`.
*/
void setSocketQueueTimeout(
folly::observer::Observer<std::chrono::nanoseconds> timeout,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
socketQueueTimeout_.set(timeout, source);
}
void setSocketQueueTimeout(
folly::Optional<std::chrono::nanoseconds> timeout,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
if (timeout) {
socketQueueTimeout_.set(*timeout, source);
} else {
socketQueueTimeout_.unset(source);
}
}
void setSocketQueueTimeout(
std::chrono::nanoseconds timeout,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
socketQueueTimeout_.set(timeout, source);
}
/**
* How long a socket with outbound data will tolerate read inactivity from a
* client. Clients must read data from their end of the connection before this
* period expires or the server will drop the connection. The amount of data
* read by the client is irrelevant. Zero disables the timeout.
*/
void setSocketWriteTimeout(
std::chrono::milliseconds timeout,
AttributeSource source = AttributeSource::OVERRIDE) {
socketWriteTimeout_.set(timeout, source);
}
std::chrono::milliseconds getSocketWriteTimeout() const {
return socketWriteTimeout_.get();
}
/**
* Gets an observer representing the socket queue timeout. If no value is
* set, this falls back to the thrift flag,
* server_default_socket_queue_timeout_ms.
*/
const folly::observer::Observer<std::chrono::nanoseconds>&
getSocketQueueTimeout() const {
return socketQueueTimeout_.getObserver();
}
/**
* Calls the twin function getTaskExpireTimeForRequest with the
* clientQueueTimeoutMs and clientTimeoutMs fields retrieved from the THeader.
*/
bool getTaskExpireTimeForRequest(
const apache::thrift::transport::THeader& header,
std::chrono::milliseconds& queueTimeout,
std::chrono::milliseconds& taskTimeout) const;
/**
* A task has two timeouts:
*
* If the task hasn't started processing the request by the time the soft
* timeout has expired, we should throw the task away.
*
* However, if the task has started processing the request by the time the
* soft timeout has expired, we shouldn't expire the task until the hard
* timeout has expired.
*
* The soft timeout protects the server from starting to process too many
* requests. The hard timeout protects us from sending responses that
* are never read.
*
* @returns whether or not the soft and hard timeouts are different
*/
bool getTaskExpireTimeForRequest(
std::chrono::milliseconds clientQueueTimeoutMs,
std::chrono::milliseconds clientTimeoutMs,
std::chrono::milliseconds& queueTimeout,
std::chrono::milliseconds& taskTimeout) const final;
/**
* Set the listen backlog. Refer to the comment on listenBacklog_ member for
* details.
*/
void setListenBacklog(
int listenBacklog,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(listenBacklog_, std::move(listenBacklog), source);
}
/**
* Get the listen backlog.
*
* @return listen backlog.
*/
int getListenBacklog() const { return listenBacklog_.get(); }
[[deprecated("Use setPreprocess instead")]] void setIsOverloaded(
IsOverloadedFunc isOverloaded) {
isOverloaded_ = std::move(isOverloaded);
}
void setPreprocess(PreprocessFunc preprocess) {
preprocess_ = std::move(preprocess);
}
void setMethodsBypassMaxRequestsLimit(
const std::vector<std::string>& methods,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(
methodsBypassMaxRequestsLimit_,
folly::sorted_vector_set<std::string>{methods.begin(), methods.end()},
source);
}
const folly::sorted_vector_set<std::string>&
getMethodsBypassMaxRequestsLimit() const {
return methodsBypassMaxRequestsLimit_.get();
}
void setGetLoad(std::function<int64_t(const std::string&)> getLoad) {
getLoad_ = getLoad;
}
std::function<int64_t(const std::string&)> getGetLoad() const {
return getLoad_;
}
/**
* Set failure injection parameters.
*/
virtual void setFailureInjection(FailureInjection fi) {
failureInjection_.set(fi);
}
void setGetHandler(getHandlerFunc func) { getHandler_ = func; }
getHandlerFunc getGetHandler() { return getHandler_; }
void setGetHeaderHandler(GetHeaderHandlerFunc func) {
getHeaderHandler_ = func;
}
GetHeaderHandlerFunc getGetHeaderHandler() { return getHeaderHandler_; }
/**
* Set the client identity hook for the server, which will be called in
* Cpp2ConnContext(). It can be used to cache client identities for each
* connection. They can be retrieved with Cpp2ConnContext::getPeerIdentities.
*/
void setClientIdentityHook(ClientIdentityHook func) {
clientIdentityHook_ = func;
}
ClientIdentityHook getClientIdentityHook() { return clientIdentityHook_; }
virtual void serve() = 0;
virtual void stop() = 0;
// This API is intended to stop listening on the server
// socket and stop accepting new connection first while
// still letting the established connections to be
// processed on the server.
virtual void stopListening() = 0;
// Allows running the server as a Runnable thread
void run() override { serve(); }
/**
* Return the maximum memory usage by each debug payload.
*/
uint64_t getMaxDebugPayloadMemoryPerRequest() const {
return maxDebugPayloadMemoryPerRequest_.get();
}
/**
* Set the maximum memory usage by each debug payload.
*/
void setMaxDebugPayloadMemoryPerRequest(
uint64_t limit,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(
maxDebugPayloadMemoryPerRequest_, std::move(limit), source);
}
/**
* Return the maximum memory usage by each worker to keep track of debug
* payloads.
*/
uint64_t getMaxDebugPayloadMemoryPerWorker() const {
return maxDebugPayloadMemoryPerWorker_.get();
}
/**
* Set the maximum memory usage by each worker to keep track of debug
* payloads.
*/
void setMaxDebugPayloadMemoryPerWorker(
uint64_t limit,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(
maxDebugPayloadMemoryPerWorker_, std::move(limit), source);
}
/**
* Return the maximum memory usage by each worker to keep track of debug
* payloads.
*/
uint16_t getMaxFinishedDebugPayloadsPerWorker() const {
return maxFinishedDebugPayloadsPerWorker_.get();
}
/**
* Set the maximum memory usage by each worker to keep track of debug
* payloads.
*/
void setMaxFinishedDebugPayloadsPerWorker(
uint16_t limit,
AttributeSource source = AttributeSource::OVERRIDE,
StaticAttributeTag = StaticAttributeTag{}) {
setStaticAttribute(
maxFinishedDebugPayloadsPerWorker_, std::move(limit), source);
}
/**
* Set write batching interval
*/
void setWriteBatchingInterval(
std::chrono::milliseconds interval,
AttributeSource source = AttributeSource::OVERRIDE) {
writeBatchingInterval_.set(interval, source);
}
/**
* Get write batching interval
*/
std::chrono::milliseconds getWriteBatchingInterval() const {
return writeBatchingInterval_.get();
}
/**
* Set write batching size. Ignored if write batching interval is not set.
*/
void setWriteBatchingSize(
size_t batchingSize, AttributeSource source = AttributeSource::OVERRIDE) {
writeBatchingSize_.set(batchingSize, source);
}
/**
* Get write batching size
*/
size_t getWriteBatchingSize() const { return writeBatchingSize_.get(); }
/**
* Set write batching byte size. Ignored if write batching interval is not
* set.
*/
void setWriteBatchingByteSize(
size_t batchingByteSize,
AttributeSource source = AttributeSource::OVERRIDE) {
writeBatchingByteSize_.set(batchingByteSize, source);
}
/**
* Get write batching byte size
*/
size_t getWriteBatchingByteSize() const {
return writeBatchingByteSize_.get();
}
const Metadata& metadata() const { return metadata_; }
Metadata& metadata() { return metadata_; }
/**
* Ingress memory is the total memory used for receiving inflight requests.
* If the memory limit is hit, the connection along with the violating request
* will be closed
*/
void setIngressMemoryLimit(
size_t ingressMemoryLimit,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
ingressMemoryLimit_.set(ingressMemoryLimit, source);
}
size_t getIngressMemoryLimit() const { return ingressMemoryLimit_.get(); }
folly::observer::Observer<size_t> getIngressMemoryLimitObserver() const {
return ingressMemoryLimit_.getObserver();
}
/**
* Limit the amount of memory available for inflight responses, meaning
* responses that are queued on the server pending delivery to clients. This
* limit, divided by the number of IO threads, determines the effective egress
* limit of a connection. Once the per-connection limit is reached, a
* connection is dropped immediately and all outstanding responses are
* discarded.
*/
void setEgressMemoryLimit(
size_t max, AttributeSource source = AttributeSource::OVERRIDE) {
egressMemoryLimit_.set(max, source);
}
size_t getEgressMemoryLimit() const { return egressMemoryLimit_.get(); }
folly::observer::Observer<size_t> getEgressMemoryLimitObserver() const {
return egressMemoryLimit_.getObserver();
}
/**
* Connection close will only be enforced and triggered on those requests with
* size greater or equal than this attribute
*/
void setMinPayloadSizeToEnforceIngressMemoryLimit(
size_t minPayloadSizeToEnforceIngressMemoryLimit,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
minPayloadSizeToEnforceIngressMemoryLimit_.set(
minPayloadSizeToEnforceIngressMemoryLimit, source);
}
size_t getMinPayloadSizeToEnforceIngressMemoryLimit() const {
return minPayloadSizeToEnforceIngressMemoryLimit_.get();
}
folly::observer::Observer<size_t>
getMinPayloadSizeToEnforceIngressMemoryLimitObserver() const {
return minPayloadSizeToEnforceIngressMemoryLimit_.getObserver();
}
size_t getEgressBufferBackpressureThreshold() const {
return egressBufferBackpressureThreshold_.get();
}
/**
* Apply backpressure to all stream generators of a connection when combined
* allocation size of inflight writes for that connection exceeds the
* threshold.
*/
void setEgressBufferBackpressureThreshold(
size_t thresholdInBytes,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
egressBufferBackpressureThreshold_.set(thresholdInBytes, source);
}
double getEgressBufferRecoveryFactor() const {
return egressBufferRecoveryFactor_.get();
}
/**
* When egress buffer backpressure is enabled, resume normal operation once
* egress buffer size falls below this factor of the threshold.
*/
void setEgressBufferRecoveryFactor(
double recoveryFactor,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
recoveryFactor = std::max(0.0, std::min(1.0, recoveryFactor));
egressBufferRecoveryFactor_.set(recoveryFactor, source);
}
folly::observer::Observer<std::chrono::milliseconds>
getPolledServiceHealthLivenessObserver() const {
return polledServiceHealthLiveness_.getObserver();
}
void setPolledServiceHealthLiveness(
std::chrono::milliseconds liveness,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
polledServiceHealthLiveness_.set(liveness, source);
}
const auto& adaptiveConcurrencyController() const {
return adaptiveConcurrencyController_;
}
// Rejects all header-backed connections to this server
void disableLegacyTransports(
bool value = true,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
disableHeaderTransport_.set(value, source);
}
bool isHeaderDisabled() const { return disableHeaderTransport_.get(); }
const folly::SocketOptionMap& getPerConnectionSocketOptions() const {
return perConnectionSocketOptions_.get();
}
void setPerConnectionSocketOptions(
folly::SocketOptionMap options,
AttributeSource source = AttributeSource::OVERRIDE,
DynamicAttributeTag = DynamicAttributeTag{}) {
perConnectionSocketOptions_.set(std::move(options), source);
}
/**
* Get the ResourcePoolSet used by this ThriftServer. There is always one, but
* it may be empty if ResourcePools are not in use.
*/
const ResourcePoolSet& resourcePoolSet() const override {
return resourcePoolSet_;
}
/**
* Get the ResourcePoolSet used by this ThriftServer. There is always one, but
* it may be empty if ResourcePools are not in use.
*/
ResourcePoolSet& resourcePoolSet() override { return resourcePoolSet_; }
bool getUsingCustomThreadManager() const { return usingCustomThreadManager_; }
};
} // namespace thrift
} // namespace apache