bool runServerDual()

in mcrouter/Server-inl.h [284:482]


bool runServerDual(
    const McrouterOptions& mcrouterOpts,
    const McrouterStandaloneOptions& standaloneOpts,
    StandalonePreRunCb preRunCb) {
  using RequestHandlerType = RequestHandler<ServerOnRequest<RouterInfo>>;
  std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool;
  CarbonRouterInstance<RouterInfo>* router;
  std::shared_ptr<AsyncMcServer> asyncMcServer;
  std::shared_ptr<apache::thrift::ThriftServer> thriftServer;
  try {
    // Create thread pool for both AsyncMcServer and ThriftServer
    ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(
        mcrouterOpts.num_proxies, mcrouterOpts.num_proxies);

    // Run observer and extract event bases
    auto executorObserver = std::make_shared<ExecutorObserver>();
    ioThreadPool->addObserver(executorObserver);
    auto evbs = executorObserver->extractEvbs();
    CHECK_EQ(evbs.size(), mcrouterOpts.num_proxies);
    ioThreadPool->removeObserver(executorObserver);

    // Create AsyncMcServer instance
    asyncMcServer =
        std::make_shared<AsyncMcServer>(detail::createAsyncMcServerOptions(
            mcrouterOpts, standaloneOpts, &evbs));

    // Create CarbonRouterInstance
    if (standaloneOpts.remote_thread) {
      router =
          CarbonRouterInstance<RouterInfo>::init("standalone", mcrouterOpts);
    } else {
      router = CarbonRouterInstance<RouterInfo>::init(
          "standalone", mcrouterOpts, ioThreadPool);
    }
    if (router == nullptr) {
      LOG(ERROR) << "CRITICAL: Failed to initialize mcrouter!";
      return false;
    }

    setupRouter<RouterInfo>(mcrouterOpts, standaloneOpts, router, preRunCb);

    // Create CarbonRouterClients for each worker thread
    std::vector<typename CarbonRouterClient<RouterInfo>::Pointer>
        carbonRouterClients;
    std::unordered_map<
        folly::EventBase*,
        std::shared_ptr<ServerOnRequest<RouterInfo>>>
        serverOnRequestMap;
    for (auto evb : evbs) {
      // Create CarbonRouterClients
      auto routerClient = standaloneOpts.remote_thread
          ? router->createClient(0 /* maximum_outstanding_requests */)
          : router->createSameThreadClient(
                0 /* maximum_outstanding_requests */);

      serverOnRequestMap.emplace(
          evb,
          std::make_shared<ServerOnRequest<RouterInfo>>(
              *routerClient,
              *evb,
              standaloneOpts.retain_source_ip,
              standaloneOpts.enable_pass_through_mode,
              standaloneOpts.remote_thread,
              router->externalStatsHandler(),
              standaloneOpts.prefix_acl_checker_enable));
      carbonRouterClients.push_back(std::move(routerClient));
    }
    CHECK_EQ(carbonRouterClients.size(), mcrouterOpts.num_proxies);
    CHECK_EQ(serverOnRequestMap.size(), mcrouterOpts.num_proxies);

    // Get local evb
    folly::EventBase* evb = ioThreadPool->getEventBaseManager()->getEventBase();

    // Thrift server setup
    apache::thrift::server::observerFactory_.reset();
    thriftServer = std::make_shared<apache::thrift::ThriftServer>();
    thriftServer->setIOThreadPool(ioThreadPool);
    thriftServer->setNumCPUWorkerThreads(1);

    // Shutdown state
    auto shutdownStarted = std::make_shared<std::atomic<bool>>(false);
    // Register signal handler which will handle ordered shutdown process of the
    // two servers
    ShutdownSignalHandler<RouterInfo> shutdownHandler(
        evb, thriftServer, asyncMcServer, shutdownStarted);
    shutdownHandler.registerSignalHandler(SIGTERM);
    shutdownHandler.registerSignalHandler(SIGINT);

    // Create thrift handler
    thriftServer->setInterface(
        std::make_shared<ServerOnRequestThrift<RouterInfo>>(
            serverOnRequestMap));

    // Add Identity Hook
    thriftServer->setClientIdentityHook(McSSLUtil::getClientIdentityHook());

    // ACL Checker for ThriftServer
    auto aclCheckerThrift =
        detail::getThriftAclChecker(mcrouterOpts, standaloneOpts);

    uint64_t qos = 0;
    if (standaloneOpts.enable_qos) {
      if (!getQoS(
              standaloneOpts.default_qos_class,
              standaloneOpts.default_qos_path,
              qos)) {
        LOG(ERROR)
            << "Incorrect qos class / qos path. Accepted connections will not"
            << "be marked.";
      }
    }

    thriftServer->setAcceptorFactory(std::make_shared<ThriftAcceptorFactory>(
        *thriftServer, std::move(aclCheckerThrift), qos));

    // Set listening port for cleartext and SSL connections
    if (standaloneOpts.thrift_port > 0) {
      thriftServer->setPort(standaloneOpts.thrift_port);
    } else {
      LOG(ERROR) << "Must specify thrift port";
      router->shutdown();
      freeAllRouters();
      return false;
    }
    thriftServer->disableActiveRequestsTracking();
    thriftServer->setSocketMaxReadsPerEvent(1);
    // Set observer for connection stats
    thriftServer->setObserver(
        std::make_shared<ThriftObserver<RouterInfo>>(*router, shutdownStarted));
    // Don't enforce default timeouts, unless Client forces them.
    thriftServer->setQueueTimeout(std::chrono::milliseconds(0));
    thriftServer->setTaskExpireTime(std::chrono::milliseconds(0));
    // Set idle and ssl handshake timeouts to 0 to be consistent with
    // AsyncMcServer
    thriftServer->setIdleServerTimeout(std::chrono::milliseconds(0));
    thriftServer->setSSLHandshakeTimeout(std::chrono::milliseconds(0));

    initStandaloneSSLDualServer(standaloneOpts, thriftServer);
    thriftServer->watchTicketPathForChanges(
        standaloneOpts.tls_ticket_key_seed_path);
    thriftServer->setStopWorkersOnStopListening(false);

    // Get acl checker for AsyncMcServer
    auto aclChecker = detail::getAclChecker(mcrouterOpts, standaloneOpts);
    // Start AsyncMcServer
    LOG(INFO) << "Starting AsyncMcServer in dual mode";
    asyncMcServer->startOnVirtualEB(
        [&carbonRouterClients,
         &router,
         &standaloneOpts,
         aclChecker = aclChecker](
            size_t threadId,
            folly::VirtualEventBase& vevb,
            AsyncMcServerWorker& worker) mutable {
          // Setup compression on each worker.
          if (standaloneOpts.enable_server_compression) {
            auto codecManager = router->getCodecManager();
            if (codecManager) {
              worker.setCompressionCodecMap(codecManager->getCodecMap());
            } else {
              LOG(WARNING)
                  << "Compression is enabled but couldn't find CodecManager. "
                  << "Compression will be disabled.";
            }
          }
          detail::serverInit<RouterInfo, RequestHandler>(
              *router,
              threadId,
              vevb.getEventBase(),
              worker,
              standaloneOpts,
              aclChecker,
              carbonRouterClients[threadId].get());
        },
        // Shutdown must be scheduled back to event base of main to ensure
        // that there we dont attempt to destruct a VirtualEventBase
        [evb = evb, &asyncMcServer, &thriftServer, &shutdownStarted] {
          evb->runInEventBaseThread([&]() {
            detail::startServerShutdown<RouterInfo>(
                thriftServer, asyncMcServer, shutdownStarted);
          });
        });

    LOG(INFO) << "Thrift Server and AsyncMcServer running.";
    // Run the ThriftServer; this blocks until the server is shut down.
    thriftServer->serve();
    thriftServer.reset();
    LOG(INFO) << "Started shutdown of CarbonRouterInstance";
    router->shutdown();
    freeAllRouters();
    // Now free iothread pool
    ioThreadPool.reset();
    LOG(INFO) << "Completed shutdown";
  } catch (const std::exception& e) {
    LOG(ERROR) << "Error creating dual mode AsyncMcServer: " << e.what();
    exit(EXIT_FAILURE);
  }
  return true;
}