void ThriftServer::setup()

in thrift/lib/cpp2/server/ThriftServer.cpp [382:609]


void ThriftServer::setup() {
  ensureDecoratedProcessorFactoryInitialized();

  auto nWorkers = getNumIOWorkerThreads();
  DCHECK_GT(nWorkers, 0u);

  addRoutingHandler(
      std::make_unique<apache::thrift::RocketRoutingHandler>(*this));

  // Initialize event base for this thread
  auto serveEventBase = eventBaseManager_->getEventBase();
  serveEventBase_ = serveEventBase;
  stopController_.set(std::make_unique<StopController>(
      folly::badge<ThriftServer>{}, *serveEventBase));
  if (idleServerTimeout_.count() > 0) {
    idleServer_.emplace(*this, serveEventBase->timer(), idleServerTimeout_);
  }
  // Print some libevent stats
  VLOG(1) << "libevent " << folly::EventBase::getLibeventVersion() << " method "
          << serveEventBase->getLibeventMethod();

  try {
#ifndef _WIN32
    // OpenSSL might try to write to a closed socket if the peer disconnects
    // abruptly, raising a SIGPIPE signal. By default this will terminate the
    // process, which we don't want. Hence we need to handle SIGPIPE specially.
    //
    // We don't use SIG_IGN here as child processes will inherit that handler.
    // Instead, we swallow the signal to enable SIGPIPE in children to behave
    // normally.
    // Furthermore, setting flags to 0 and using sigaction prevents SA_RESTART
    // from restarting syscalls after the handler completed. This is important
    // for code using SIGPIPE to interrupt syscalls in other threads.
    struct sigaction sa = {};
    sa.sa_handler = [](int) {};
    sa.sa_flags = 0;
    sigemptyset(&sa.sa_mask);
    sigaction(SIGPIPE, &sa, nullptr);
#endif

    if (!getObserver() && server::observerFactory_) {
      setObserver(server::observerFactory_->getObserver());
    }

    if (!useResourcePoolsFlagsSet()) {
      // We always need a threadmanager for cpp2.
      LOG(INFO) << "Using thread manager (resource pools not enabled)";
      setupThreadManager();
      threadManager_->setExpireCallback([&](std::shared_ptr<Runnable> r) {
        EventTask* task = dynamic_cast<EventTask*>(r.get());
        if (task) {
          task->expired();
        }
      });
      threadManager_->setCodelCallback([&](std::shared_ptr<Runnable>) {
        auto observer = getObserver();
        if (observer) {
          if (getEnableCodel()) {
            observer->queueTimeout();
          } else {
            observer->shadowQueueTimeout();
          }
        }
      });
    } else {
      LOG(INFO) << "Using resource pools";
      DCHECK(!threadManager_);
      ensureResourcePools();
      // Keep concurrency controller in sync with max requests for now.
      resourcePoolSet()
          .resourcePool(ResourcePoolHandle::defaultAsync())
          .concurrencyController()
          .value()
          ->setExecutionLimitRequests(getMaxRequests());
    }

    if (!serverChannel_) {
      ServerBootstrap::socketConfig.acceptBacklog = getListenBacklog();
      ServerBootstrap::socketConfig.maxNumPendingConnectionsPerWorker =
          getMaxNumPendingConnectionsPerWorker();
      if (reusePort_.value_or(false)) {
        ServerBootstrap::setReusePort(true);
      }
      if (enableTFO_) {
        ServerBootstrap::socketConfig.enableTCPFastOpen = *enableTFO_;
        ServerBootstrap::socketConfig.fastOpenQueueSize = fastOpenQueueSize_;
      }

      ioThreadPool_->addObserver(
          folly::IOThreadPoolDeadlockDetectorObserver::create(
              ioThreadPool_->getName()));
      ioObserverFactories.withRLock([this](auto& factories) {
        for (auto& f : factories) {
          ioThreadPool_->addObserver(f(
              ioThreadPool_->getName(), ioThreadPool_->getThreadIdCollector()));
        }
      });

      // Resize the IO pool
      ioThreadPool_->setNumThreads(nWorkers);
      if (!acceptPool_) {
        acceptPool_ = std::make_shared<folly::IOThreadPoolExecutor>(
            nAcceptors_,
            std::make_shared<folly::NamedThreadFactory>("Acceptor Thread"));
      }

      auto acceptorFactory = acceptorFactory_
          ? acceptorFactory_
          : std::make_shared<DefaultThriftAcceptorFactory>(this);
      if (auto factory = dynamic_cast<wangle::AcceptorFactorySharedSSLContext*>(
              acceptorFactory.get())) {
        sharedSSLContextManager_ = factory->initSharedSSLContextManager();
      }
      ServerBootstrap::childHandler(std::move(acceptorFactory));

      {
        std::lock_guard<std::mutex> lock(ioGroupMutex_);
        ServerBootstrap::group(acceptPool_, ioThreadPool_);
      }
      if (socket_) {
        ServerBootstrap::bind(std::move(socket_));
      } else if (!getAddress().isInitialized()) {
        ServerBootstrap::bind(port_);
      } else {
        for (auto& address : addresses_) {
          ServerBootstrap::bind(address);
        }
      }
      // Update address_ with the address that we are actually bound to.
      // (This is needed if we were supplied a pre-bound socket, or if
      // address_'s port was set to 0, so an ephemeral port was chosen by
      // the kernel.)
      ServerBootstrap::getSockets()[0]->getAddress(&addresses_.at(0));

      // we enable zerocopy for the server socket if the
      // zeroCopyEnableFunc_ is valid
      bool useZeroCopy = !!zeroCopyEnableFunc_;
      for (auto& socket : getSockets()) {
        auto* evb = socket->getEventBase();
        evb->runImmediatelyOrRunInEventBaseThreadAndWait([&] {
          socket->setShutdownSocketSet(wShutdownSocketSet_);
          socket->setAcceptRateAdjustSpeed(acceptRateAdjustSpeed_);
          socket->setZeroCopy(useZeroCopy);
          socket->setQueueTimeout(getSocketQueueTimeout());

          try {
            socket->setTosReflect(tosReflect_);
            socket->setListenerTos(listenerTos_);
          } catch (std::exception const& ex) {
            LOG(ERROR) << "Got exception setting up TOS settings: "
                       << folly::exceptionStr(ex);
          }
        });
      }
    } else {
      startDuplex();
    }

#if FOLLY_HAS_COROUTINES
    asyncScope_ = std::make_unique<folly::coro::CancellableAsyncScope>();
#endif
    for (auto handler : collectServiceHandlers()) {
      handler->attachServer(*this);
    }

    DCHECK(
        internalStatus_.load(std::memory_order_relaxed) ==
        ServerStatus::NOT_RUNNING);
    // The server is not yet ready for the user's service methods but fine to
    // handle internal methods. See ServerConfigs::getInternalMethods().
    internalStatus_.store(
        ServerStatus::PRE_STARTING, std::memory_order_release);

    // Notify handler of the preStart event
    for (const auto& eventHandler : getEventHandlersUnsafe()) {
      eventHandler->preStart(&addresses_.at(0));
    }

    internalStatus_.store(ServerStatus::STARTING, std::memory_order_release);

    // Called after setup
    callOnStartServing();

    // After the onStartServing hooks have finished, we are ready to handle
    // requests, at least from the server's perspective.
    internalStatus_.store(ServerStatus::RUNNING, std::memory_order_release);

#if FOLLY_HAS_COROUTINES
    // Set up polling for PolledServiceHealth handlers if necessary
    {
      DCHECK(!getServiceHealth().has_value());
      auto handlers = collectServiceHandlers<PolledServiceHealth>();
      if (!handlers.empty()) {
        auto poll = ServiceHealthPoller::poll(
            std::move(handlers), getPolledServiceHealthLivenessObserver());
        auto loop = folly::coro::co_invoke(
            [this,
             poll = std::move(poll)]() mutable -> folly::coro::Task<void> {
              while (auto value = co_await poll.next()) {
                co_await folly::coro::co_safe_point;
                cachedServiceHealth_.store(*value, std::memory_order_relaxed);
              }
            });
        asyncScope_->add(std::move(loop).scheduleOn(getExecutor()));
      }
    }
#endif

    // Notify handler of the preServe event
    for (const auto& eventHandler : getEventHandlersUnsafe()) {
      eventHandler->preServe(&addresses_.at(0));
    }

    // Do not allow setters to be called past this point until the IO worker
    // threads have been joined in stopWorkers().
    configMutable_ = false;
  } catch (std::exception& ex) {
    // This block allows us to investigate the exception using gdb
    LOG(ERROR) << "Got an exception while setting up the server: " << ex.what();
    handleSetupFailure();
    throw;
  } catch (...) {
    handleSetupFailure();
    throw;
  }

  THRIFT_SERVER_EVENT(serve).log(*this);
}