remote/ServerHandlerContext.cpp (153 lines of code) (raw):
#include "ServerHandlerContext.h"
#include "router/MessageRoutersManager.h"
#include "RpcExecutor.h"
#include <queue>
#include <condition_variable>
#include <utility>
#include "browser/RemoteClient.h"
#include "handlers/RemoteClientHandler.h"
namespace {
template<typename T>
class BlockingQueue {
public:
void push(const T& value) {
std::lock_guard<std::mutex> lock(mutex);
queue.push(std::move(value));
condition.notify_one();
}
T wait_pop() { // blocking pop
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [this] { return !queue.empty(); });
if (queue.empty())
return nullptr;
T const value = std::move(queue.front());
queue.pop();
return value;
}
bool empty() {
std::lock_guard<std::mutex> lock(mutex);
return queue.empty();
}
void notify_one() {
std::lock_guard<std::mutex> lock(mutex);
condition.notify_one();
}
private:
std::mutex mutex;
std::queue<T> queue;
std::condition_variable condition;
};
void bgExecutorLoop(std::shared_ptr<BackgroundExecutor> exec);
} // namespace
class BackgroundExecutor {
public:
BackgroundExecutor() {}
void setService(std::shared_ptr<RpcExecutor> service, std::shared_ptr<BackgroundExecutor> exec) {
myService = service;
myBgThread = std::make_shared<std::thread>(std::bind(&bgExecutorLoop, exec));
myBgThread->detach();
}
// schedules rpc execution (in bg thread)
void post(JavaVoidRpc rpc) {
myQueue.push(rpc);
}
void run() {
Log::setThreadName("BackgroundExecutor");
while (!myStop) {
JavaVoidRpc rpc = myQueue.wait_pop();
if (rpc != nullptr)
myService->exec(rpc);
}
}
void stop() { myStop = true; myQueue.notify_one(); }
private:
std::shared_ptr<std::thread> myBgThread;
BlockingQueue<JavaVoidRpc> myQueue;
bool myStop = false;
std::shared_ptr<RpcExecutor> myService; // represents background java thread for 'background' calls execution
};
namespace {
void bgExecutorLoop(std::shared_ptr<BackgroundExecutor> exec) {
exec->run();
}
}
ServerHandlerContext::ServerHandlerContext() :
myBgExecutor(std::make_shared<BackgroundExecutor>()) {}
void ServerHandlerContext::initJavaServicePipe(const std::string & pipeName) {
myJavaService = std::make_shared<RpcExecutor>(pipeName);
myJavaServiceIO = std::make_shared<RpcExecutor>(pipeName);
myJavaServiceBg = std::make_shared<RpcExecutor>(pipeName);
myBgExecutor->setService(myJavaServiceBg, myBgExecutor);
}
void ServerHandlerContext::initJavaServicePort(int port) {
myJavaService = std::make_shared<RpcExecutor>(port);
myJavaServiceIO = std::make_shared<RpcExecutor>(port);
myJavaServiceBg = std::make_shared<RpcExecutor>(port);
myBgExecutor->setService(myJavaServiceBg, myBgExecutor);
}
std::shared_ptr<RemoteClient> ServerHandlerContext::createRemoteClient(int handlersMask, std::shared_ptr<ServerHandlerContext> ctx) {
const int cid = RemoteClient::genNewCid();
CefRefPtr<RemoteClientHandler> handler = new RemoteClientHandler(ctx, cid, handlersMask);
std::shared_ptr<RemoteClient> result = std::make_shared<RemoteClient>(cid, handler);
std::unique_lock lock(myMutex);
myClients[cid] = result;
return result;
}
std::shared_ptr<RemoteClient> ServerHandlerContext::findRemoteClient(int cid) {
std::unique_lock lock(myMutex);
const auto & i = myClients.find(cid);
return i == myClients.end() ? nullptr : i->second;
}
void ServerHandlerContext::disposeRemoteClient(int cid) {
std::unique_lock lock(myMutex);
const auto & i = myClients.find(cid);
if (i == myClients.end())
return;
i->second->close();
myClients.erase(cid);
}
void ServerHandlerContext::invokeLater(JavaVoidRpc rpc) {
myBgExecutor->post(rpc);
}
void ServerHandlerContext::close() {
if (myIsClosed)
return;
myIsClosed = true;
std::vector<std::shared_ptr<RemoteClient>> clients;
{
std::unique_lock lock(myMutex);
for (auto kv : myClients)
if (kv.second)
clients.push_back(kv.second);
myClients.clear();
}
for (auto c : clients)
c->close();
try {
if (myJavaService && !myJavaService->isClosed())
myJavaService->close();
if (myJavaServiceIO && !myJavaServiceIO->isClosed())
myJavaServiceIO->close();
if (myJavaServiceBg && !myJavaServiceBg->isClosed())
myJavaServiceBg->close();
myBgExecutor->stop();
} catch (apache::thrift::TException& e) {
Log::error("Thrift exception in ServerHandlerContext::close: %s", e.what());
}
}
std::string ServerHandlerContext::getDebugInfo(int tabs, bool detailed) {
std::stringstream ss;
for (int i = 0; i < tabs; ++i) ss << "\t";
if (myIsClosed)
ss << "Closed ctx " << this << std::endl;
else {
ss << "Active ctx " << this << std::endl;
if (detailed) {
for (int i = 0; i < tabs; ++i) ss << "\t";
std::unique_lock lock(myMutex);
ss << "RemoteClients [count=" << myClients.size() << "]:" << std::endl;
for (auto i : myClients)
ss << i.second->getDebugInfo(tabs + 1);
}
}
return ss.str();
}