remote/ServerApplication.cpp (355 lines of code) (raw):

#include "ServerApplication.h" #include <thread> #include <mutex> #include <condition_variable> #include "include/base/cef_callback.h" #include "include/wrapper/cef_closure_task.h" #include "include/cef_app.h" #include "include/cef_base.h" #include "log/Log.h" #include "Utils.h" #include "ServerHandler.h" #include "ServerHandlerContext.h" #include "RpcExecutor.h" #include "DebugInfo.h" #include <sstream> #include "browser/RemoteBrowser.h" #include "handlers/app/RemoteAppHandler.h" #if defined(OS_LINUX) #include <X11/Xlib.h> #endif #include <regex> using namespace apache::thrift; using namespace thrift_codegen; namespace { bool TRACE_HANDLERS_LIFESPAN = false; std::regex * TRACE_THRIFT_MESSAGES_REGEXP = nullptr; } class MyServerProcessor : public ServerProcessor { public: MyServerProcessor(::std::shared_ptr<ServerHandler> iface) : ServerProcessor(iface), myHandler(iface) {} bool process(std::shared_ptr<protocol::TProtocol> in, std::shared_ptr<protocol::TProtocol> out, void* connectionContext) override { std::string fname; protocol::TMessageType mtype; int32_t seqid; in->readMessageBegin(fname, mtype, seqid); if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) { Log::error("received invalid message type %d from client", mtype); return false; } myIsProcessing = true; myStartDispatch = std::chrono::steady_clock::now(); myFuncName = fname; if (TRACE_THRIFT_MESSAGES_REGEXP != nullptr) { std::smatch m; if(std::regex_match(fname, m, *TRACE_THRIFT_MESSAGES_REGEXP)) Log::trace("\t process %s", fname.c_str()); } const bool dispatchResult = dispatchCall(in.get(), out.get(), fname, seqid, connectionContext); myIsProcessing = false; return dispatchResult; } const std::shared_ptr<ServerHandler> getServerHandler() const { return myHandler; } bool isProcessing() const { return myIsProcessing; } std::chrono::steady_clock::time_point getStartDispatch() const { return myStartDispatch; } std::string getFuncName() const { return myFuncName; } private: const std::shared_ptr<ServerHandler> myHandler; volatile bool myIsProcessing = false; std::chrono::steady_clock::time_point myStartDispatch; std::string myFuncName; }; class MyServerProcessorFactory : public ::apache::thrift::TProcessorFactory { public: MyServerProcessorFactory() noexcept {} ::std::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) override { ServerHandler * ptrServerHandler = new ServerHandler; if (TRACE_HANDLERS_LIFESPAN) Log::trace("Created ServerHandler: %p", ptrServerHandler); const int cooldownMs = ServerApplication::instance().getCmdArgs().getOpenTransportCooldownMs(); if (cooldownMs > 0) { // system 'cooldown' (otherwise pipe transport may not open on Windows) std::this_thread::sleep_for(std::chrono::milliseconds(cooldownMs)); } std::shared_ptr<ServerHandler> handler(ptrServerHandler); std::shared_ptr<MyServerProcessor> processor(new MyServerProcessor(handler), [&](MyServerProcessor* p){ if (TRACE_HANDLERS_LIFESPAN) Log::trace("Release ServerHandler: %p", p->getServerHandler().get()); const bool isMaster = p->getServerHandler()->isMaster(); bool needStartShuttingDown = false; { Lock lock(myMutex); myProcessors.erase(p); if (isMaster) { needStartShuttingDown = true; for (auto processor: myProcessors) if (processor->getServerHandler()->isMaster()) { needStartShuttingDown = false; break; } } } delete p; if (needStartShuttingDown) ServerApplication::instance().startShuttingDown(); }); Lock lock(myMutex); myProcessors.insert(processor.get()); return processor; } void forEach(std::function<void(const MyServerProcessor*)> visitor); bool hasMaster(); std::shared_ptr<ServerHandlerContext> findCtx(int connectionId); protected: std::recursive_mutex myMutex; std::set<const MyServerProcessor*> myProcessors; }; std::shared_ptr<ServerHandlerContext> MyServerProcessorFactory::findCtx(int connectionId) { Lock lock(myMutex); for (auto p: myProcessors) if (p->getServerHandler()->getCid() == connectionId) return p->getServerHandler()->getCtx(); return nullptr; } void MyServerProcessorFactory::forEach(std::function<void(const MyServerProcessor*)> visitor) { Lock lock(myMutex); for (const auto& h: myProcessors) visitor(h); } struct cancelled_error {}; class CancellationPoint { public: CancellationPoint() : myStop(false) {} void cancel() { std::unique_lock<std::mutex> lock(myMutex); myStop = true; myCond.notify_all(); } template <typename P> void wait(const P& period) { std::unique_lock<std::mutex> lock(myMutex); if (myStop || myCond.wait_for(lock, period) == std::cv_status::no_timeout) throw cancelled_error(); } private: bool myStop; std::mutex myMutex; std::condition_variable myCond; }; ServerApplication ServerApplication::ourInstance; ServerApplication::ServerApplication() {} ServerApplication::~ServerApplication() { if (myAppHandler != nullptr) myAppHandler->Release(); } #if defined(OS_MAC) namespace CefUtils { bool loadCefFramework(); } #endif void ServerApplication::onBeforeExit() { myStopWatcher->cancel(); } bool ServerApplication::init(int argc, char* argv[]) { myTimeStart = std::chrono::steady_clock::now(); if (!myCmdArgs.init(argc, argv)) { Log::debug("Show help and exit."); return false; } Log::info("Init ServerApplication with transport: %s.", myCmdArgs.getTransportDesc().c_str()); myFactory = std::make_shared<MyServerProcessorFactory>(); #if defined(OS_MAC) const std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); if (!CefUtils::loadCefFramework()) { Log::error("Can't load CEF framework library."); return false; } const std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now(); if (Log::isDebugEnabled()) { auto d1 = std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0); Log::debug("Loaded CEF framework library, spent %d ms", (int)d1.count()); } #elif defined(OS_LINUX) XInitThreads(); #endif CefSettings settings; myCmdArgs.prepareCefSettings(&settings); myAppHandler = new RemoteAppHandler(myCmdArgs.myChromiumSwitches, settings, myCmdArgs.myCustomSchemes); myAppHandler->AddRef(); // Read constants from env TRACE_HANDLERS_LIFESPAN = getBoolEnv("CEF_SERVER_TRACE_HANDLERS_LIFESPAN"); const char* envTraceMessagesRegexp = getenv("CEF_SERVER_TRACE_THRIFT_MESSAGES_REGEXP"); if (envTraceMessagesRegexp != nullptr) TRACE_THRIFT_MESSAGES_REGEXP = new std::regex(envTraceMessagesRegexp); // Init watcher thread myStopWatcher = std::make_shared<CancellationPoint>(); myThreadWatcher = std::thread([&]() { Log::setThreadName("Watcher"); const std::chrono::milliseconds timeoutWatchMs(getLongEnv("CEF_SERVER_timeoutWatchMs", 5000)); std::chrono::milliseconds timeoutDebugLogMs(getLongEnv("CEF_SERVER_TimeoutStacktraceLogMs", 30 * 1000)); std::chrono::milliseconds timeoutExecutionMs(getLongEnv("CEF_SERVER_ourTimeoutExecutionMs", 5 * 1000)); std::chrono::milliseconds timeoutShuttingDownMs(getLongEnv("CEF_SERVER_ourTimeoutShuttingDownMs", 25 * 1000)); std::chrono::steady_clock::time_point lastDebugLog = std::chrono::steady_clock::now() - timeoutDebugLogMs; while (true) { try { myStopWatcher->wait(std::chrono::milliseconds(timeoutWatchMs)); } catch (const cancelled_error&) { Log::debug("Watcher thread was stopped."); return 0; } std::this_thread::sleep_for(timeoutWatchMs); const std::chrono::time_point now(std::chrono::steady_clock::now()); myFactory->forEach([&](const MyServerProcessor* p){ if (!p || !p->getServerHandler()) { Log::error("Can't be: !p || !p->getServerHandler()"); return; } if (!p->getServerHandler()->getCtx()) return; enum { ServerHandler, JavaService, JavaServiceIO }; using namespace std::chrono_literals; std::chrono::duration<float, std::micro> execTimes[] = {0us, 0us, 0us}; const std::chrono::time_point now(std::chrono::steady_clock::now()); // 1. Check ServerHandler timings if (p->isProcessing()) execTimes[ServerHandler] = now - p->getStartDispatch(); // 2. Check JavaService timings std::shared_ptr<RpcExecutor> rpcExecutor = p->getServerHandler()->getCtx()->javaService(); if (rpcExecutor && rpcExecutor->isProcessing()) execTimes[JavaService] = now - rpcExecutor->getProcessingStart(); //printDebugIfNecessary(,); std::shared_ptr<RpcExecutor> rpcExecutorIO = p->getServerHandler()->getCtx()->javaServiceIO(); if (rpcExecutorIO && rpcExecutorIO->isProcessing()) execTimes[JavaServiceIO] = now - rpcExecutorIO->getProcessingStart(); //printDebugIfNecessary(rpcExecutorIO->getProcessingName(), const bool isTimeoutServerHandler = execTimes[ServerHandler] > timeoutExecutionMs; const bool isTimeoutJavaService = execTimes[JavaService] > timeoutExecutionMs; const bool isTimeoutJavaServiceIO = execTimes[JavaServiceIO] > timeoutExecutionMs; const bool needPrintDebugLog = (now - lastDebugLog) >= timeoutDebugLogMs; if (isTimeoutServerHandler || isTimeoutJavaService || isTimeoutJavaServiceIO) { if (needPrintDebugLog) { Log::warn("Execution time exceeds timeout. Probably deadlock occurred. Events:"); if (isTimeoutServerHandler) Log::info("Java2Native function '%s' executes more than %d seconds", p->getFuncName().c_str(), (int)(std::chrono::duration_cast<std::chrono::seconds>(execTimes[ServerHandler])).count()); else if (p->isProcessing()) Log::info("Java2Native function '%s' was started.", p->getFuncName().c_str()); else Log::info("Java2Native function '%s' was finished.", p->getFuncName().c_str()); if (isTimeoutJavaService) Log::info("Native2Java function '%s' executes more than %d seconds", rpcExecutor->getProcessingName().c_str(), (int)(std::chrono::duration_cast<std::chrono::seconds>(execTimes[JavaService])).count()); else if (rpcExecutor->isProcessing()) Log::info("Native2Java function '%s' was started.", rpcExecutor->getProcessingName().c_str()); else Log::info("Native2Java function '%s' was finished.", rpcExecutor->getProcessingName().c_str()); if (isTimeoutJavaServiceIO) Log::info("Native2Java IO function '%s' executes more than %d seconds", rpcExecutorIO->getProcessingName().c_str(), (int)(std::chrono::duration_cast<std::chrono::seconds>(execTimes[JavaServiceIO])).count()); else if (rpcExecutorIO->isProcessing()) Log::info("Native2Java IO function '%s' was started.", rpcExecutorIO->getProcessingName().c_str()); else Log::info("Native2Java IO function '%s' was finished.", rpcExecutorIO->getProcessingName().c_str()); } } }); // 3. Check application timings bool needShutdownHard = false; { Lock lock(myMutexState); if (myState >= SS_SHUTTING_DOWN) { std::chrono::duration<float, std::micro> elapsed = now - myTimeStartShuttingDown; if (elapsed > timeoutShuttingDownMs) { Log::warn("Start hard shutdown (elapsed %d ms)", std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count()); myState = SS_SHUTDOWN; needShutdownHard = true; } } } if (needShutdownHard) { CefPostTask(TID_UI, base::BindOnce(CefQuitMessageLoop)); Log::debug("CefQuitMessageLoop is posted (to be executed on UI thread), wait a little before exit..."); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); Log::debug("Buy [%s]!", myCmdArgs.getTransportDesc().c_str()); std::exit(0); } } }); myThreadWatcher.detach(); return true; } std::shared_ptr<apache::thrift::TProcessorFactory> ServerApplication::getProcessorFactory() const { return myFactory; } void ServerApplication::startShuttingDown() { { Lock lock(myMutexState); if (myState >= SS_SHUTTING_DOWN) return; myState = SS_SHUTTING_DOWN; myTimeStartShuttingDown = std::chrono::steady_clock::now(); } myThreadShutdown = std::thread([&]() { Log::setThreadName("Shutdown"); std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now(); const std::chrono::milliseconds timeout(getLongEnv("CEF_SERVER_timeoutShutdownMs", 15000)); while (std::chrono::steady_clock::now() - start < timeout) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); const std::chrono::time_point now(std::chrono::steady_clock::now()); if (RemoteBrowser::getAllBrowsersCount() == 0) { { Lock lock(myMutexState); myState = SS_SHUTDOWN; } CefPostTask(TID_UI, base::BindOnce(CefQuitMessageLoop)); Log::debug("CefQuitMessageLoop will be invoked now (on TID_UI)."); return; } if (Log::isTraceEnabled()) { std::vector<int> remainingBids = RemoteBrowser::enumAllBrowsers(); std::stringstream ss; for (int bid : remainingBids) ss << bid << ", "; Log::trace("Server is waiting for closing remaining browsers with bids: %s", ss.str().c_str()); } } Log::trace("Exit shutdown-loop because of timeout."); CefPostTask(TID_UI, base::BindOnce(CefQuitMessageLoop)); // not sure that it is necessary here.. }); myThreadShutdown.detach(); } std::string ServerApplication::getRootPath() const { return myAppHandler->getRootPath(); } bool ServerApplication::isDefaultRoot() const { return myAppHandler->isDefaultRoot(); } const std::chrono::steady_clock::time_point& ServerApplication::getStartTime() const { return myTimeStart; } std::string ServerApplication::getState() { Lock lock(myMutexState); switch (myState) { case SS_NEW: return "SS_NEW"; case SS_SHUTDOWN: return "SHUTDOWN"; case SS_SHUTTING_DOWN: return "SHUTTING_DOWN"; default: return "UNKNOWN_STATE"; } } std::string ServerApplication::getStateWithDetails() { std::stringstream ss; ss << "Server state: " << getState() << std::endl; ss << "Server handlers:" << std::endl; myFactory->forEach([&](const MyServerProcessor* p) { if (!p || !p->getServerHandler()) return; // can't be ss << p->getServerHandler()->getDebugInfo(1); }); ss << std::endl; ss << "RemoteObject factories:" << std::endl; ss << DebugInfo::getInfo(1); ss << "Measures:" << std::endl; ss << DebugInfo::getMeasures(1); return ss.str(); } std::shared_ptr<ServerHandlerContext> ServerApplication::getCtx(int connectionId) { return myFactory->findCtx(connectionId); }