in thrift/lib/cpp2/server/ThriftServer.cpp [382:609]
void ThriftServer::setup() {
ensureDecoratedProcessorFactoryInitialized();
auto nWorkers = getNumIOWorkerThreads();
DCHECK_GT(nWorkers, 0u);
addRoutingHandler(
std::make_unique<apache::thrift::RocketRoutingHandler>(*this));
// Initialize event base for this thread
auto serveEventBase = eventBaseManager_->getEventBase();
serveEventBase_ = serveEventBase;
stopController_.set(std::make_unique<StopController>(
folly::badge<ThriftServer>{}, *serveEventBase));
if (idleServerTimeout_.count() > 0) {
idleServer_.emplace(*this, serveEventBase->timer(), idleServerTimeout_);
}
// Print some libevent stats
VLOG(1) << "libevent " << folly::EventBase::getLibeventVersion() << " method "
<< serveEventBase->getLibeventMethod();
try {
#ifndef _WIN32
// OpenSSL might try to write to a closed socket if the peer disconnects
// abruptly, raising a SIGPIPE signal. By default this will terminate the
// process, which we don't want. Hence we need to handle SIGPIPE specially.
//
// We don't use SIG_IGN here as child processes will inherit that handler.
// Instead, we swallow the signal to enable SIGPIPE in children to behave
// normally.
// Furthermore, setting flags to 0 and using sigaction prevents SA_RESTART
// from restarting syscalls after the handler completed. This is important
// for code using SIGPIPE to interrupt syscalls in other threads.
struct sigaction sa = {};
sa.sa_handler = [](int) {};
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sigaction(SIGPIPE, &sa, nullptr);
#endif
if (!getObserver() && server::observerFactory_) {
setObserver(server::observerFactory_->getObserver());
}
if (!useResourcePoolsFlagsSet()) {
// We always need a threadmanager for cpp2.
LOG(INFO) << "Using thread manager (resource pools not enabled)";
setupThreadManager();
threadManager_->setExpireCallback([&](std::shared_ptr<Runnable> r) {
EventTask* task = dynamic_cast<EventTask*>(r.get());
if (task) {
task->expired();
}
});
threadManager_->setCodelCallback([&](std::shared_ptr<Runnable>) {
auto observer = getObserver();
if (observer) {
if (getEnableCodel()) {
observer->queueTimeout();
} else {
observer->shadowQueueTimeout();
}
}
});
} else {
LOG(INFO) << "Using resource pools";
DCHECK(!threadManager_);
ensureResourcePools();
// Keep concurrency controller in sync with max requests for now.
resourcePoolSet()
.resourcePool(ResourcePoolHandle::defaultAsync())
.concurrencyController()
.value()
->setExecutionLimitRequests(getMaxRequests());
}
if (!serverChannel_) {
ServerBootstrap::socketConfig.acceptBacklog = getListenBacklog();
ServerBootstrap::socketConfig.maxNumPendingConnectionsPerWorker =
getMaxNumPendingConnectionsPerWorker();
if (reusePort_.value_or(false)) {
ServerBootstrap::setReusePort(true);
}
if (enableTFO_) {
ServerBootstrap::socketConfig.enableTCPFastOpen = *enableTFO_;
ServerBootstrap::socketConfig.fastOpenQueueSize = fastOpenQueueSize_;
}
ioThreadPool_->addObserver(
folly::IOThreadPoolDeadlockDetectorObserver::create(
ioThreadPool_->getName()));
ioObserverFactories.withRLock([this](auto& factories) {
for (auto& f : factories) {
ioThreadPool_->addObserver(f(
ioThreadPool_->getName(), ioThreadPool_->getThreadIdCollector()));
}
});
// Resize the IO pool
ioThreadPool_->setNumThreads(nWorkers);
if (!acceptPool_) {
acceptPool_ = std::make_shared<folly::IOThreadPoolExecutor>(
nAcceptors_,
std::make_shared<folly::NamedThreadFactory>("Acceptor Thread"));
}
auto acceptorFactory = acceptorFactory_
? acceptorFactory_
: std::make_shared<DefaultThriftAcceptorFactory>(this);
if (auto factory = dynamic_cast<wangle::AcceptorFactorySharedSSLContext*>(
acceptorFactory.get())) {
sharedSSLContextManager_ = factory->initSharedSSLContextManager();
}
ServerBootstrap::childHandler(std::move(acceptorFactory));
{
std::lock_guard<std::mutex> lock(ioGroupMutex_);
ServerBootstrap::group(acceptPool_, ioThreadPool_);
}
if (socket_) {
ServerBootstrap::bind(std::move(socket_));
} else if (!getAddress().isInitialized()) {
ServerBootstrap::bind(port_);
} else {
for (auto& address : addresses_) {
ServerBootstrap::bind(address);
}
}
// Update address_ with the address that we are actually bound to.
// (This is needed if we were supplied a pre-bound socket, or if
// address_'s port was set to 0, so an ephemeral port was chosen by
// the kernel.)
ServerBootstrap::getSockets()[0]->getAddress(&addresses_.at(0));
// we enable zerocopy for the server socket if the
// zeroCopyEnableFunc_ is valid
bool useZeroCopy = !!zeroCopyEnableFunc_;
for (auto& socket : getSockets()) {
auto* evb = socket->getEventBase();
evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
socket->setShutdownSocketSet(wShutdownSocketSet_);
socket->setAcceptRateAdjustSpeed(acceptRateAdjustSpeed_);
socket->setZeroCopy(useZeroCopy);
socket->setQueueTimeout(getSocketQueueTimeout());
try {
socket->setTosReflect(tosReflect_);
socket->setListenerTos(listenerTos_);
} catch (std::exception const& ex) {
LOG(ERROR) << "Got exception setting up TOS settings: "
<< folly::exceptionStr(ex);
}
});
}
} else {
startDuplex();
}
#if FOLLY_HAS_COROUTINES
asyncScope_ = std::make_unique<folly::coro::CancellableAsyncScope>();
#endif
for (auto handler : collectServiceHandlers()) {
handler->attachServer(*this);
}
DCHECK(
internalStatus_.load(std::memory_order_relaxed) ==
ServerStatus::NOT_RUNNING);
// The server is not yet ready for the user's service methods but fine to
// handle internal methods. See ServerConfigs::getInternalMethods().
internalStatus_.store(
ServerStatus::PRE_STARTING, std::memory_order_release);
// Notify handler of the preStart event
for (const auto& eventHandler : getEventHandlersUnsafe()) {
eventHandler->preStart(&addresses_.at(0));
}
internalStatus_.store(ServerStatus::STARTING, std::memory_order_release);
// Called after setup
callOnStartServing();
// After the onStartServing hooks have finished, we are ready to handle
// requests, at least from the server's perspective.
internalStatus_.store(ServerStatus::RUNNING, std::memory_order_release);
#if FOLLY_HAS_COROUTINES
// Set up polling for PolledServiceHealth handlers if necessary
{
DCHECK(!getServiceHealth().has_value());
auto handlers = collectServiceHandlers<PolledServiceHealth>();
if (!handlers.empty()) {
auto poll = ServiceHealthPoller::poll(
std::move(handlers), getPolledServiceHealthLivenessObserver());
auto loop = folly::coro::co_invoke(
[this,
poll = std::move(poll)]() mutable -> folly::coro::Task<void> {
while (auto value = co_await poll.next()) {
co_await folly::coro::co_safe_point;
cachedServiceHealth_.store(*value, std::memory_order_relaxed);
}
});
asyncScope_->add(std::move(loop).scheduleOn(getExecutor()));
}
}
#endif
// Notify handler of the preServe event
for (const auto& eventHandler : getEventHandlersUnsafe()) {
eventHandler->preServe(&addresses_.at(0));
}
// Do not allow setters to be called past this point until the IO worker
// threads have been joined in stopWorkers().
configMutable_ = false;
} catch (std::exception& ex) {
// This block allows us to investigate the exception using gdb
LOG(ERROR) << "Got an exception while setting up the server: " << ex.what();
handleSetupFailure();
throw;
} catch (...) {
handleSetupFailure();
throw;
}
THRIFT_SERVER_EVENT(serve).log(*this);
}