Wdt.cpp (201 lines of code) (raw):
#include <wdt/Wdt.h>
#include <wdt/util/WdtFlags.h>
using std::string;
namespace facebook {
namespace wdt {
// this must be called first and exactly once:
Wdt &Wdt::initializeWdt(const std::string &appName) {
static bool doGlobalFlagsInit = true;
if (doGlobalFlagsInit) {
WdtFlags::initializeFromFlags(WdtOptions::getMutable());
doGlobalFlagsInit = false;
}
Wdt &res = getWdtInternal(appName, []() -> Wdt * { return new Wdt(); });
res.initializeWdtInternal(appName);
// At fb we do this for services - that's floody for cmd line though
// res.printWdtOptions(WLOG(INFO));
return res;
}
ErrorCode Wdt::initializeWdtInternal(const std::string &appName) {
WLOG(INFO) << "One time initialization of WDT for " << appName;
if (initDone_) {
WLOG(ERROR) << "Called initializeWdt() more than once";
return ERROR;
}
appName_ = appName;
initDone_ = true;
resourceController_->getWdtThrottler()->setThrottlerRates(
options_.getThrottlerOptions());
return OK;
}
// this can be called many times after initializeWdt()
Wdt &Wdt::getWdt(const std::string &appName) {
Wdt &res = getWdtInternal(appName, nullptr);
if (!res.initDone_) {
WLOG(ERROR) << "Called getWdt() before/without initializeWdt()";
WDT_CHECK(false) << "Must call initializeWdt() once before getWdt()";
}
return res;
}
ErrorCode Wdt::printWdtOptions(std::ostream &out) {
WdtFlags::printOptions(out, options_);
return OK;
}
Wdt::Wdt() {
WdtFlags::initializeFromFlags(options_);
resourceController_ = std::make_unique<WdtResourceController>(options_);
}
Wdt::Wdt(std::shared_ptr<Throttler> throttler) {
WdtFlags::initializeFromFlags(options_);
resourceController_ =
std::make_unique<WdtResourceController>(options_, throttler);
}
std::string Wdt::getSenderIdentifier(const WdtTransferRequest &req) {
if (req.destIdentifier.empty()) {
return req.hostName;
}
return req.destIdentifier;
}
ErrorCode Wdt::createWdtSender(const WdtTransferRequest &req,
std::shared_ptr<IAbortChecker> abortChecker,
bool terminateExistingOne,
SenderPtr &senderPtr) {
if (req.errorCode != OK) {
WLOG(ERROR) << "Transfer request error " << errorCodeToStr(req.errorCode);
return req.errorCode;
}
// Protocol issues will/should be flagged as error when we call createSender
// try to create sender
const std::string &wdtNamespace = req.wdtNamespace;
const std::string secondKey = getSenderIdentifier(req);
ErrorCode errCode = resourceController_->createSender(wdtNamespace, secondKey,
req, senderPtr);
if (errCode == ALREADY_EXISTS && terminateExistingOne) {
WLOG(WARNING) << "Found pre-existing sender for " << wdtNamespace << " "
<< secondKey << " aborting it and making a new one";
if (senderPtr->getTransferRequest().transferId == req.transferId) {
WLOG(WARNING) << "No need to recreate same sender with key: " << secondKey
<< " TransferRequest: " << req;
return ALREADY_EXISTS;
}
senderPtr->abort(ABORTED_BY_APPLICATION);
// This may log an error too
resourceController_->releaseSender(wdtNamespace, secondKey);
// Try#2
errCode = resourceController_->createSender(wdtNamespace, secondKey, req,
senderPtr);
}
if (errCode != OK) {
WLOG(ERROR) << "Failed to create sender " << errorCodeToStr(errCode) << " "
<< wdtNamespace << " " << secondKey;
return errCode;
}
wdtSetAbortSocketCreatorAndReporter(
senderPtr.get(), senderPtr->getTransferRequest(), abortChecker);
return OK;
}
ErrorCode Wdt::releaseWdtSender(const WdtTransferRequest &wdtRequest) {
return resourceController_->releaseSender(wdtRequest.wdtNamespace,
getSenderIdentifier(wdtRequest));
}
ErrorCode Wdt::wdtSend(const WdtTransferRequest &req,
std::shared_ptr<IAbortChecker> abortChecker,
bool terminateExistingOne) {
SenderPtr sender;
ErrorCode errCode =
createWdtSender(req, abortChecker, terminateExistingOne, sender);
if (errCode != OK) {
return errCode;
}
const std::string &wdtNamespace = req.wdtNamespace;
auto validatedReq = sender->init();
if (validatedReq.errorCode != OK) {
WLOG(ERROR) << "Couldn't init sender with request for " << wdtNamespace;
return validatedReq.errorCode;
}
auto transferReport = sender->transfer();
errCode = transferReport->getSummary().getErrorCode();
releaseWdtSender(req);
WLOG(INFO) << "wdtSend for " << wdtNamespace << " " << req.hostName
<< " ended with " << errorCodeToStr(errCode);
return errCode;
}
ErrorCode Wdt::wdtReceiveStart(const std::string &wdtNamespace,
WdtTransferRequest &req,
const std::string &identifier,
std::shared_ptr<IAbortChecker> abortChecker) {
if (req.errorCode != OK) {
WLOG(ERROR) << "Transfer request namespace:" << wdtNamespace
<< " identifier:" << identifier
<< " error:" << errorCodeToStr(req.errorCode);
return req.errorCode;
}
ReceiverPtr receiver;
ErrorCode errCode = resourceController_->createReceiver(
wdtNamespace, identifier, req, receiver);
if (errCode != OK) {
WLOG(ERROR) << "Failed to create receiver " << errorCodeToStr(errCode)
<< " " << wdtNamespace << " " << identifier;
req.errorCode = errCode;
return errCode;
}
wdtSetAbortSocketCreatorAndReporter(receiver.get(), req, abortChecker);
req = receiver->init();
if (req.errorCode != OK) {
WLOG(ERROR) << "Couldn't init receiver with request for " << wdtNamespace
<< " " << identifier;
return req.errorCode;
}
errCode = receiver->transferAsync();
WLOG(INFO) << "wdtReceiveStart for " << wdtNamespace << " " << identifier
<< " : " << errorCodeToStr(errCode);
req.errorCode = errCode;
return errCode;
}
ErrorCode Wdt::wdtReceiveFinish(const std::string &wdtNamespace,
const std::string &identifier) {
ReceiverPtr receiver =
resourceController_->getReceiver(wdtNamespace, identifier);
if (receiver == nullptr) {
WLOG(ERROR) << "Failed to get receiver " << errorCodeToStr(NOT_FOUND) << " "
<< wdtNamespace << " " << identifier;
return NOT_FOUND;
}
auto report = receiver->finish();
ErrorCode errCode = report->getSummary().getErrorCode();
WLOG(INFO) << "wdtReceiveFinish for " << wdtNamespace << " " << identifier
<< " ended with " << errorCodeToStr(errCode);
resourceController_->releaseReceiver(wdtNamespace, identifier);
return errCode;
}
ErrorCode Wdt::wdtSetAbortSocketCreatorAndReporter(
WdtBase *target, const WdtTransferRequest &,
std::shared_ptr<IAbortChecker> abortChecker) {
if (abortChecker.get() != nullptr) {
target->setAbortChecker(abortChecker);
}
return OK;
}
WdtOptions &Wdt::getWdtOptions() {
return options_;
}
static std::unordered_map<std::string, std::unique_ptr<Wdt>> s_wdtMap;
static std::mutex s_mutex;
// private version
Wdt &Wdt::getWdtInternal(const std::string &appName,
std::function<Wdt *()> factory) {
std::lock_guard<std::mutex> lock(s_mutex);
auto it = s_wdtMap.find(appName);
if (it != s_wdtMap.end()) {
return *(it->second);
}
WDT_CHECK(factory) << "Must call initializeWdt() once before getWdt() "
<< appName;
Wdt *wdtPtr = factory();
std::unique_ptr<Wdt> wdt(wdtPtr);
s_wdtMap.emplace(appName, std::move(wdt));
return *wdtPtr;
}
void Wdt::releaseWdt(const std::string &appName) {
LOG(INFO) << "Releasing WDT for " << appName;
std::lock_guard<std::mutex> lock(s_mutex);
auto it = s_wdtMap.find(appName);
if (it == s_wdtMap.end()) {
LOG(ERROR) << appName << " not found in releaseWdt";
return;
}
s_wdtMap.erase(it);
}
} // namespace wdt
} // namespace facebook