server/cpp/CppServer.cpp (139 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
#include "cpp/CppServer.h"
#include <memory>
#include <unordered_set>
#include <folly/CppAttributes.h>
#include <folly/Memory.h>
#include <folly/io/IOBuf.h>
#include <folly/io/async/EventBase.h>
#include <thrift/lib/cpp/TApplicationException.h>
#include <thrift/lib/cpp2/async/AsyncProcessor.h>
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "cpp/Destructible.h"
#include "cpp/HsStruct.h"
namespace apache {
namespace thrift {
/**
* Listener for events coming out of the server.
* Used to notify when the server has started serving traffic.
*/
class CppEventHandler : public server::TServerEventHandler {
public:
CppEventHandler(
int fd,
int* port,
ThriftServer& server,
std::shared_ptr<TServerEventHandler> prevHandler)
: fd_(fd), port_(port), server_(server), prevEventHandler_(prevHandler) {}
void preServe(const folly::SocketAddress* address) override {
server_.setServerEventHandler(prevEventHandler_);
if (prevEventHandler_) {
prevEventHandler_->preServe(address);
}
*port_ = address->getPort();
notify();
}
void handleServeError(const std::exception& x) override {
server_.setServerEventHandler(prevEventHandler_);
if (prevEventHandler_) {
prevEventHandler_->handleServeError(x);
}
*port_ = 0;
notify();
}
private:
int fd_;
int* port_;
ThriftServer& server_;
std::shared_ptr<TServerEventHandler> prevEventHandler_;
void notify() {
int64_t buf = 1;
write(fd_, &buf, 8);
}
};
class CppServer : public ThriftServer {
public:
explicit CppServer(
TCallback callback,
TFactory factoryFn,
int desiredPort,
std::unordered_set<std::string>&& oneways)
: oneways_(std::move(oneways)) {
setPort(desiredPort);
setInterface(
std::unique_ptr<AsyncProcessorFactory>(factoryFn(callback, oneways_)));
}
// Start serving traffic
HsString* FOLLY_NULLABLE
go(int fd, int* portPtr, void (*modify)(ThriftServer&)) {
try {
auto prevHandler = getEventHandler();
eHandler_ =
std::make_shared<CppEventHandler>(fd, portPtr, *this, prevHandler);
setServerEventHandler(eHandler_);
// invoke optional callback to modify the ThriftServer
if (modify) {
modify(*this);
}
// Thrift main loop. This will run indefinitely, until stop() is called
serve();
} catch (const std::exception& e) {
*portPtr = 0;
auto exStr = e.what();
LOG(ERROR) << "Unable to start serving traffic: " << exStr;
getEventHandler()->handleServeError(e);
return new HsString(exStr);
}
return nullptr;
}
private:
std::shared_ptr<CppEventHandler> eHandler_;
std::unordered_set<std::string> oneways_;
};
} // namespace thrift
} // namespace apache
// ****************************************************
// Exported functions
extern "C" {
apache::thrift::AsyncProcessorFactory* c_haskell_factory(
apache::thrift::TCallback callback,
const std::unordered_set<std::string>& oneways) noexcept {
return new apache::thrift::HaskellAsyncProcessorFactory(callback, oneways);
}
using CreateCppServerResult = HsEither<apache::thrift::CppServer*, HsString>;
HS_DEFINE_DESTRUCTIBLE(
CreateCppServerResult,
HsEither<apache::thrift::CppServer*, HsString>);
CreateCppServerResult* c_create_cpp_server(
apache::thrift::TCallback callback,
apache::thrift::TFactory factoryFn,
int desiredPort,
int workers,
const char** onewayNames,
size_t* onewaySizes,
size_t onewayLength) noexcept {
try {
std::unordered_set<std::string> oneways;
for (size_t i = 0; i < onewayLength; i++) {
oneways.emplace(std::string(onewayNames[i], onewaySizes[i]));
}
auto cppServer = new apache::thrift::CppServer(
callback, factoryFn, desiredPort, std::move(oneways));
if (workers > 0) {
cppServer->setNumCPUWorkerThreads(workers);
}
return new CreateCppServerResult(HsLeft, std::move(cppServer));
} catch (const std::exception& e) {
auto exStr = e.what();
LOG(ERROR) << "Failed to start server: " << exStr;
return new CreateCppServerResult(HsRight, exStr);
}
}
void c_destroy_cpp_server(apache::thrift::CppServer* s) noexcept {
delete s;
}
HsString* FOLLY_NULLABLE c_serve_cpp_server(
apache::thrift::CppServer* s,
int fd,
int* portPtr,
void (*modify)(apache::thrift::ThriftServer&)) noexcept {
return s->go(fd, portPtr, modify);
}
void c_stop_cpp_server(apache::thrift::CppServer* s) noexcept {
try {
s->stop();
} catch (const std::exception& e) {
LOG(ERROR) << "Error stopping server: " << e.what();
}
}
}