in mcrouter/Server-inl.h [284:482]
bool runServerDual(
const McrouterOptions& mcrouterOpts,
const McrouterStandaloneOptions& standaloneOpts,
StandalonePreRunCb preRunCb) {
using RequestHandlerType = RequestHandler<ServerOnRequest<RouterInfo>>;
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool;
CarbonRouterInstance<RouterInfo>* router;
std::shared_ptr<AsyncMcServer> asyncMcServer;
std::shared_ptr<apache::thrift::ThriftServer> thriftServer;
try {
// Create thread pool for both AsyncMcServer and ThriftServer
ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(
mcrouterOpts.num_proxies, mcrouterOpts.num_proxies);
// Run observer and extract event bases
auto executorObserver = std::make_shared<ExecutorObserver>();
ioThreadPool->addObserver(executorObserver);
auto evbs = executorObserver->extractEvbs();
CHECK_EQ(evbs.size(), mcrouterOpts.num_proxies);
ioThreadPool->removeObserver(executorObserver);
// Create AsyncMcServer instance
asyncMcServer =
std::make_shared<AsyncMcServer>(detail::createAsyncMcServerOptions(
mcrouterOpts, standaloneOpts, &evbs));
// Create CarbonRouterInstance
if (standaloneOpts.remote_thread) {
router =
CarbonRouterInstance<RouterInfo>::init("standalone", mcrouterOpts);
} else {
router = CarbonRouterInstance<RouterInfo>::init(
"standalone", mcrouterOpts, ioThreadPool);
}
if (router == nullptr) {
LOG(ERROR) << "CRITICAL: Failed to initialize mcrouter!";
return false;
}
setupRouter<RouterInfo>(mcrouterOpts, standaloneOpts, router, preRunCb);
// Create CarbonRouterClients for each worker thread
std::vector<typename CarbonRouterClient<RouterInfo>::Pointer>
carbonRouterClients;
std::unordered_map<
folly::EventBase*,
std::shared_ptr<ServerOnRequest<RouterInfo>>>
serverOnRequestMap;
for (auto evb : evbs) {
// Create CarbonRouterClients
auto routerClient = standaloneOpts.remote_thread
? router->createClient(0 /* maximum_outstanding_requests */)
: router->createSameThreadClient(
0 /* maximum_outstanding_requests */);
serverOnRequestMap.emplace(
evb,
std::make_shared<ServerOnRequest<RouterInfo>>(
*routerClient,
*evb,
standaloneOpts.retain_source_ip,
standaloneOpts.enable_pass_through_mode,
standaloneOpts.remote_thread,
router->externalStatsHandler(),
standaloneOpts.prefix_acl_checker_enable));
carbonRouterClients.push_back(std::move(routerClient));
}
CHECK_EQ(carbonRouterClients.size(), mcrouterOpts.num_proxies);
CHECK_EQ(serverOnRequestMap.size(), mcrouterOpts.num_proxies);
// Get local evb
folly::EventBase* evb = ioThreadPool->getEventBaseManager()->getEventBase();
// Thrift server setup
apache::thrift::server::observerFactory_.reset();
thriftServer = std::make_shared<apache::thrift::ThriftServer>();
thriftServer->setIOThreadPool(ioThreadPool);
thriftServer->setNumCPUWorkerThreads(1);
// Shutdown state
auto shutdownStarted = std::make_shared<std::atomic<bool>>(false);
// Register signal handler which will handle ordered shutdown process of the
// two servers
ShutdownSignalHandler<RouterInfo> shutdownHandler(
evb, thriftServer, asyncMcServer, shutdownStarted);
shutdownHandler.registerSignalHandler(SIGTERM);
shutdownHandler.registerSignalHandler(SIGINT);
// Create thrift handler
thriftServer->setInterface(
std::make_shared<ServerOnRequestThrift<RouterInfo>>(
serverOnRequestMap));
// Add Identity Hook
thriftServer->setClientIdentityHook(McSSLUtil::getClientIdentityHook());
// ACL Checker for ThriftServer
auto aclCheckerThrift =
detail::getThriftAclChecker(mcrouterOpts, standaloneOpts);
uint64_t qos = 0;
if (standaloneOpts.enable_qos) {
if (!getQoS(
standaloneOpts.default_qos_class,
standaloneOpts.default_qos_path,
qos)) {
LOG(ERROR)
<< "Incorrect qos class / qos path. Accepted connections will not"
<< "be marked.";
}
}
thriftServer->setAcceptorFactory(std::make_shared<ThriftAcceptorFactory>(
*thriftServer, std::move(aclCheckerThrift), qos));
// Set listening port for cleartext and SSL connections
if (standaloneOpts.thrift_port > 0) {
thriftServer->setPort(standaloneOpts.thrift_port);
} else {
LOG(ERROR) << "Must specify thrift port";
router->shutdown();
freeAllRouters();
return false;
}
thriftServer->disableActiveRequestsTracking();
thriftServer->setSocketMaxReadsPerEvent(1);
// Set observer for connection stats
thriftServer->setObserver(
std::make_shared<ThriftObserver<RouterInfo>>(*router, shutdownStarted));
// Don't enforce default timeouts, unless Client forces them.
thriftServer->setQueueTimeout(std::chrono::milliseconds(0));
thriftServer->setTaskExpireTime(std::chrono::milliseconds(0));
// Set idle and ssl handshake timeouts to 0 to be consistent with
// AsyncMcServer
thriftServer->setIdleServerTimeout(std::chrono::milliseconds(0));
thriftServer->setSSLHandshakeTimeout(std::chrono::milliseconds(0));
initStandaloneSSLDualServer(standaloneOpts, thriftServer);
thriftServer->watchTicketPathForChanges(
standaloneOpts.tls_ticket_key_seed_path);
thriftServer->setStopWorkersOnStopListening(false);
// Get acl checker for AsyncMcServer
auto aclChecker = detail::getAclChecker(mcrouterOpts, standaloneOpts);
// Start AsyncMcServer
LOG(INFO) << "Starting AsyncMcServer in dual mode";
asyncMcServer->startOnVirtualEB(
[&carbonRouterClients,
&router,
&standaloneOpts,
aclChecker = aclChecker](
size_t threadId,
folly::VirtualEventBase& vevb,
AsyncMcServerWorker& worker) mutable {
// Setup compression on each worker.
if (standaloneOpts.enable_server_compression) {
auto codecManager = router->getCodecManager();
if (codecManager) {
worker.setCompressionCodecMap(codecManager->getCodecMap());
} else {
LOG(WARNING)
<< "Compression is enabled but couldn't find CodecManager. "
<< "Compression will be disabled.";
}
}
detail::serverInit<RouterInfo, RequestHandler>(
*router,
threadId,
vevb.getEventBase(),
worker,
standaloneOpts,
aclChecker,
carbonRouterClients[threadId].get());
},
// Shutdown must be scheduled back to event base of main to ensure
// that there we dont attempt to destruct a VirtualEventBase
[evb = evb, &asyncMcServer, &thriftServer, &shutdownStarted] {
evb->runInEventBaseThread([&]() {
detail::startServerShutdown<RouterInfo>(
thriftServer, asyncMcServer, shutdownStarted);
});
});
LOG(INFO) << "Thrift Server and AsyncMcServer running.";
// Run the ThriftServer; this blocks until the server is shut down.
thriftServer->serve();
thriftServer.reset();
LOG(INFO) << "Started shutdown of CarbonRouterInstance";
router->shutdown();
freeAllRouters();
// Now free iothread pool
ioThreadPool.reset();
LOG(INFO) << "Completed shutdown";
} catch (const std::exception& e) {
LOG(ERROR) << "Error creating dual mode AsyncMcServer: " << e.what();
exit(EXIT_FAILURE);
}
return true;
}