remote/RpcExecutor.cpp (100 lines of code) (raw):
#include "RpcExecutor.h"
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TTransportUtils.h>
#include "Utils.h"
#include "DebugInfo.h"
#ifdef WIN32
#include "windows/PipeTransport.h"
#else
#include <boost/filesystem.hpp>
#endif
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace thrift_codegen;
namespace {
#ifndef NDEBUG
const bool doMeasureTimes = getBoolEnv("CEF_SERVER_MEASURE_RpcExecutor", true);
#else
const bool doMeasureTimes = getBoolEnv("CEF_SERVER_MEASURE_RpcExecutor", false);
#endif
const bool doTraceAll = getBoolEnv("CEF_SERVER_TRACE_RpcExecutor");
}
class MyBinaryProtocol : public TBinaryProtocolT<TTransport> {
public:
explicit MyBinaryProtocol(const std::shared_ptr<TTransport>& trans) : TBinaryProtocolT(trans) {}
uint32_t writeMessageBegin_virt(const std::string& name,
const TMessageType messageType,
const int32_t seqid) override {
myLastMessageName = name;
if (doTraceAll && Log::isTraceEnabled())
Log::trace("RpcExecutor: exec '%s'", name.c_str());
return TVirtualProtocol::writeMessageBegin_virt(name, messageType, seqid);
}
const std::string& getLastMessageName() const { return myLastMessageName; }
private:
std::string myLastMessageName = "";
};
RpcExecutor::RpcExecutor(int port) {
myTransport = std::make_shared<TBufferedTransport>(std::make_shared<TSocket>("127.0.0.1", port));
myProtocol = std::make_shared<MyBinaryProtocol>(myTransport);
myService = std::make_shared<ClientHandlersClient>(myProtocol);
myTransport->open();
}
RpcExecutor::RpcExecutor(std::string pipeName) {
#ifdef WIN32
myTransport = std::make_shared<PipeTransport>("\\\\.\\pipe\\" + pipeName);
#else
myTransport = std::make_shared<TSocket>(pipeName.c_str());
#endif
myProtocol = std::make_shared<MyBinaryProtocol>(myTransport);
myService = std::make_shared<ClientHandlersClient>(myProtocol);
myTransport->open();
}
std::string RpcExecutor::getProcessingName() const { return myProtocol->getLastMessageName(); }
void RpcExecutor::beforeExec() {
myIsProcessing = true;
myStartExec = std::chrono::steady_clock::now();
}
void RpcExecutor::afterExec() {
myIsProcessing = false;
if (doMeasureTimes)
DebugInfo::addMeasure(
"RpcExecutor." + myProtocol->getLastMessageName(),
std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - myStartExec).count()
);
}
void RpcExecutor::close() {
Lock lock(myMutex);
if (myService != nullptr) {
myService = nullptr;
try {
myTransport->close();
} catch (const TException& e) {
Log::error("Exception during rpc-executor transport closing, err: %s", e.what());
}
myTransport = nullptr;
}
}
void RpcExecutor::exec(std::function<void(JavaService)> rpc) {
Lock lock(myMutex);
if (myService == nullptr) {
if (doTraceAll && Log::isTraceEnabled())
Log::trace("RpcExecutor: null remote service");
return;
}
ExecHolder eh(*this);
try {
rpc(myService);
} catch (apache::thrift::TException& tx) {
onThriftException(tx);
}
}
void RpcExecutor::onThriftException(apache::thrift::TException& tx) {
if (Log::isTraceEnabled()) {
Log::trace("RpcExecutor: thrift exception occurred: %s", tx.what());
Log::trace("RpcExecutor: name of executed rpc: %s", getProcessingName().c_str());
}
close();
}