WdtResourceController.cpp (638 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include <wdt/WdtResourceController.h> using namespace std; const int64_t kDelTimeToSleepMillis = 100; namespace facebook { namespace wdt { const char *const WdtResourceController::kGlobalNamespace("Global"); void WdtControllerBase::updateMaxReceiversLimit(int64_t maxNumReceivers) { GuardLock lock(controllerMutex_); maxNumReceivers_ = maxNumReceivers; WLOG(INFO) << "Updated max number of receivers for " << controllerName_ << " to " << maxNumReceivers_; } void WdtControllerBase::updateMaxSendersLimit(int64_t maxNumSenders) { GuardLock lock(controllerMutex_); maxNumSenders_ = maxNumSenders; WLOG(INFO) << "Updated max number of senders for " << controllerName_ << " to " << maxNumSenders_; } WdtControllerBase::WdtControllerBase(const string &controllerName) { controllerName_ = controllerName; } WdtNamespaceController::WdtNamespaceController( const string &wdtNamespace, const WdtResourceController *const parent) : WdtControllerBase(wdtNamespace), parent_(parent) { auto &options = parent_->getOptions(); updateMaxSendersLimit(options.namespace_sender_limit); updateMaxReceiversLimit(options.namespace_receiver_limit); } bool WdtNamespaceController::hasReceiverQuota() const { GuardLock lock(controllerMutex_); if (numReceivers_ >= maxNumReceivers_ && maxNumReceivers_ > 0) { WLOG(WARNING) << "Exceeded number of receivers for " << controllerName_ << " Max number of receivers " << maxNumReceivers_; return false; } return true; } ErrorCode WdtNamespaceController::createReceiver( const WdtTransferRequest &request, const string &identifier, ReceiverPtr &receiver) { receiver = nullptr; { GuardLock lock(controllerMutex_); // Check for already existing auto it = receiversMap_.find(identifier); if (it != receiversMap_.end()) { WLOG(ERROR) << "Receiver already created for transfer " << identifier; // Return it so the old one can potentially be aborted receiver = it->second; return ALREADY_EXISTS; } // Check for quotas if (!hasReceiverQuota()) { return QUOTA_EXCEEDED; } receiver = make_shared<Receiver>(request); receiver->setThrottler(parent_->getWdtThrottler()); receiver->setWdtOptions(parent_->getOptions()); receiversMap_[identifier] = receiver; ++numReceivers_; } return OK; } bool WdtNamespaceController::hasSenderQuota() const { GuardLock lock(controllerMutex_); if (numSenders_ >= maxNumSenders_ && maxNumSenders_ > 0) { WLOG(WARNING) << "Exceeded number of senders for " << controllerName_ << " Max number of senders " << maxNumSenders_; return false; } return true; } ErrorCode WdtNamespaceController::createSender( const WdtTransferRequest &request, const string &identifier, SenderPtr &sender) { sender = nullptr; { GuardLock lock(controllerMutex_); // Check for already existing auto it = sendersMap_.find(identifier); if (it != sendersMap_.end()) { WLOG(ERROR) << "Sender already created for transfer " << identifier; // Return it so the old one can potentially be aborted sender = it->second; return ALREADY_EXISTS; } /// Check for quotas if (!hasSenderQuota()) { return QUOTA_EXCEEDED; } sender = make_shared<Sender>(request); sender->setThrottler(parent_->getWdtThrottler()); sender->setWdtOptions(parent_->getOptions()); sendersMap_[identifier] = sender; ++numSenders_; } return OK; } ErrorCode WdtNamespaceController::releaseReceiver( const std::string &identifier) { ReceiverPtr receiver = nullptr; { GuardLock lock(controllerMutex_); auto it = receiversMap_.find(identifier); if (it == receiversMap_.end()) { WLOG(ERROR) << "Couldn't find receiver to release with id " << identifier << " for " << controllerName_; return NOT_FOUND; } receiver = std::move(it->second); receiversMap_.erase(it); --numReceivers_; } // receiver will be deleted and logs printed by the destructor // if no other thread has the shared pointer, that is... WLOG(INFO) << "Released the receiver with id " << receiver->getTransferId(); return OK; } ErrorCode WdtNamespaceController::releaseSender(const std::string &identifier) { SenderPtr sender = nullptr; { GuardLock lock(controllerMutex_); auto it = sendersMap_.find(identifier); if (it == sendersMap_.end()) { WLOG(ERROR) << "Couldn't find sender to release with id " << identifier << " for " << controllerName_; return NOT_FOUND; } sender = std::move(it->second); sendersMap_.erase(it); --numSenders_; } WLOG(INFO) << "Released the sender with id " << sender->getTransferId(); return OK; } int64_t WdtNamespaceController::releaseAllSenders() { vector<SenderPtr> senders; { GuardLock lock(controllerMutex_); for (auto &senderPair : sendersMap_) { senders.push_back(std::move(senderPair.second)); } sendersMap_.clear(); numSenders_ = 0; } int numSenders = senders.size(); WVLOG(1) << "Number of senders released " << numSenders; return numSenders; } vector<string> WdtNamespaceController::releaseStaleSenders() { vector<SenderPtr> senders; vector<string> erasedIds; { GuardLock lock(controllerMutex_); for (auto it = sendersMap_.begin(); it != sendersMap_.end();) { auto sender = it->second; string identifier = it->first; if (sender->isStale()) { it = sendersMap_.erase(it); erasedIds.push_back(identifier); senders.push_back(std::move(sender)); --numSenders_; continue; } it++; } } WLOG(INFO) << "Cleared " << senders.size() << " stale senders"; return erasedIds; } int64_t WdtNamespaceController::releaseAllReceivers() { vector<ReceiverPtr> receivers; { GuardLock lock(controllerMutex_); for (auto &receiverPair : receiversMap_) { receivers.push_back(std::move(receiverPair.second)); } receiversMap_.clear(); numReceivers_ = 0; } int numReceivers = receivers.size(); WVLOG(1) << "Number of receivers released " << numReceivers; return numReceivers; } vector<string> WdtNamespaceController::releaseStaleReceivers() { vector<ReceiverPtr> receivers; vector<string> erasedIds; { GuardLock lock(controllerMutex_); for (auto it = receiversMap_.begin(); it != receiversMap_.end();) { auto receiver = it->second; string identifier = it->first; if (receiver->isStale()) { it = receiversMap_.erase(it); erasedIds.push_back(identifier); receivers.push_back(std::move(receiver)); --numReceivers_; continue; } it++; } } WLOG(INFO) << "Cleared " << receivers.size() << "stale receivers"; return erasedIds; } SenderPtr WdtNamespaceController::getSender(const string &identifier) const { GuardLock lock(controllerMutex_); auto it = sendersMap_.find(identifier); if (it == sendersMap_.end()) { WLOG(ERROR) << "Couldn't find sender transfer-id " << identifier << " for " << controllerName_; return nullptr; } return it->second; } ReceiverPtr WdtNamespaceController::getReceiver( const string &identifier) const { GuardLock lock(controllerMutex_); auto it = receiversMap_.find(identifier); if (it == receiversMap_.end()) { WLOG(ERROR) << "Couldn't find receiver transfer-id " << identifier << " for " << controllerName_; return nullptr; } return it->second; } vector<SenderPtr> WdtNamespaceController::getSenders() const { vector<SenderPtr> senders; GuardLock lock(controllerMutex_); for (const auto &senderPair : sendersMap_) { senders.push_back(senderPair.second); } return senders; } vector<ReceiverPtr> WdtNamespaceController::getReceivers() const { vector<ReceiverPtr> receivers; GuardLock lock(controllerMutex_); for (const auto &receiverPair : receiversMap_) { receivers.push_back(receiverPair.second); } return receivers; } vector<string> WdtNamespaceController::getSendersIds() const { vector<string> senderIds; GuardLock lock(controllerMutex_); for (const auto &senderPair : sendersMap_) { senderIds.push_back(senderPair.first); } return senderIds; } WdtNamespaceController::~WdtNamespaceController() { // release is done by parent shutdown } WdtResourceController::WdtResourceController( const WdtOptions &options, std::shared_ptr<Throttler> throttler) : WdtControllerBase("_root controller_"), options_(options) { updateMaxSendersLimit(options.global_sender_limit); updateMaxReceiversLimit(options.global_receiver_limit); throttler_ = throttler; } WdtResourceController::WdtResourceController(const WdtOptions &options) : WdtResourceController( options, std::make_shared<Throttler>(options.getThrottlerOptions())) { } WdtResourceController::WdtResourceController() : WdtResourceController(WdtOptions::get()) { } WdtResourceController *WdtResourceController::get() { static WdtResourceController wdtController(WdtOptions::get()); return &wdtController; } void WdtResourceController::shutdown() { WLOG(INFO) << "Shutting down the controller (" << numSenders_ << " senders " << numReceivers_ << " receivers)"; GuardLock lock(controllerMutex_); for (auto &namespaceController : namespaceMap_) { NamespaceControllerPtr controller = namespaceController.second; numSenders_ -= controller->releaseAllSenders(); numReceivers_ -= controller->releaseAllReceivers(); WVLOG(1) << "Cleared out controller for " << namespaceController.first; } namespaceMap_.clear(); WDT_CHECK_EQ(numReceivers_, 0); WDT_CHECK_EQ(numSenders_, 0); WVLOG(1) << "Shutdown the wdt resource controller"; } WdtResourceController::~WdtResourceController() { shutdown(); } ErrorCode WdtResourceController::getCounts(int32_t &numNamespaces, int32_t &numSenders, int32_t &numReceivers) { GuardLock lock(controllerMutex_); numSenders = numSenders_; numReceivers = numReceivers_; numNamespaces = namespaceMap_.size(); return OK; } bool WdtResourceController::hasSenderQuota( const std::string &wdtNamespace) const { const auto &controller = getNamespaceController(wdtNamespace); return hasSenderQuotaInternal(controller); } bool WdtResourceController::hasSenderQuotaInternal( const std::shared_ptr<WdtNamespaceController> &controller) const { GuardLock lock(controllerMutex_); if ((numSenders_ >= maxNumSenders_) && (maxNumSenders_ > 0)) { WLOG(WARNING) << "Exceeded quota on max senders. " << "Max num senders " << maxNumSenders_ << " and we have " << numSenders_ << " existing senders"; return false; } if (controller && !controller->hasSenderQuota()) { return false; } return true; } ErrorCode WdtResourceController::createSender( const std::string &wdtNamespace, const std::string &identifier, const WdtTransferRequest &wdtOperationRequest, SenderPtr &sender) { NamespaceControllerPtr controller = nullptr; sender = nullptr; { GuardLock lock(controllerMutex_); controller = getNamespaceController(wdtNamespace); if (!controller) { if (strictRegistration_) { WLOG(WARNING) << "Couldn't find controller for " << wdtNamespace; return NOT_FOUND; } else { WLOG(INFO) << "First time " << (wdtNamespace.empty() ? "(default)" : wdtNamespace) << " is seen, creating."; controller = createNamespaceController(wdtNamespace); } } if (!hasSenderQuotaInternal(controller)) { WLOG(ERROR) << "No quota for more sender."; return QUOTA_EXCEEDED; } ++numSenders_; } // TODO: not thread safe reading from options_ throttler_->setThrottlerRates(options_.getThrottlerOptions()); ErrorCode code = controller->createSender(wdtOperationRequest, identifier, sender); if (code != OK) { GuardLock lock(controllerMutex_); --numSenders_; WLOG(ERROR) << "Failed in creating sender for " << wdtNamespace << " " << errorCodeToStr(code); } else { WLOG(INFO) << "Successfully added a sender for " << wdtNamespace << " identifier " << identifier; } return code; } bool WdtResourceController::hasReceiverQuota( const std::string &wdtNamespace) const { const auto &controller = getNamespaceController(wdtNamespace); return hasReceiverQuotaInternal(controller); } bool WdtResourceController::hasReceiverQuotaInternal( const std::shared_ptr<WdtNamespaceController> &controller) const { GuardLock lock(controllerMutex_); if ((numReceivers_ >= maxNumReceivers_) && (maxNumReceivers_ > 0)) { WLOG(WARNING) << "Exceeded quota on max receivers. " << "Max num receivers " << maxNumReceivers_ << " and we have " << numReceivers_ << " existing receivers"; return false; } if (controller && !controller->hasReceiverQuota()) { return false; } return true; } ErrorCode WdtResourceController::createReceiver( const std::string &wdtNamespace, const string &identifier, const WdtTransferRequest &wdtOperationRequest, ReceiverPtr &receiver) { NamespaceControllerPtr controller = nullptr; receiver = nullptr; { GuardLock lock(controllerMutex_); controller = getNamespaceController(wdtNamespace); if (!controller) { if (strictRegistration_) { WLOG(WARNING) << "Couldn't find controller for " << wdtNamespace; return NOT_FOUND; } else { WLOG(INFO) << "First time " << wdtNamespace << " is seen, creating."; controller = createNamespaceController(wdtNamespace); } } if (!hasReceiverQuotaInternal(controller)) { WLOG(ERROR) << "No quota for more receiver."; return QUOTA_EXCEEDED; } ++numReceivers_; } // TODO: not thread safe reading from options_ throttler_->setThrottlerRates(options_.getThrottlerOptions()); ErrorCode code = controller->createReceiver(wdtOperationRequest, identifier, receiver); if (code != OK) { GuardLock lock(controllerMutex_); --numReceivers_; WLOG(ERROR) << "Failed in creating receiver for " << wdtNamespace << " " << errorCodeToStr(code); } else { WLOG(INFO) << "Successfully added a receiver for " << wdtNamespace << " identifier " << identifier; } return code; } ErrorCode WdtResourceController::releaseSender(const std::string &wdtNamespace, const std::string &identifier) { NamespaceControllerPtr controller = nullptr; { controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(WARNING) << "Couldn't find controller for " << wdtNamespace; return ERROR; } } if (controller->releaseSender(identifier) == OK) { GuardLock lock(controllerMutex_); --numSenders_; return OK; } WLOG(ERROR) << "Couldn't release sender " << identifier << " for " << wdtNamespace; return ERROR; } ErrorCode WdtResourceController::releaseAllSenders( const std::string &wdtNamespace) { NamespaceControllerPtr controller = nullptr; { controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(WARNING) << "Couldn't find controller for " << wdtNamespace; return ERROR; } } int64_t numSenders = controller->releaseAllSenders(); if (numSenders > 0) { GuardLock lock(controllerMutex_); numSenders_ -= numSenders; } return OK; } ErrorCode WdtResourceController::releaseReceiver( const std::string &wdtNamespace, const std::string &identifier) { NamespaceControllerPtr controller = nullptr; { controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(WARNING) << "Couldn't find controller for " << wdtNamespace; return ERROR; } } if (controller->releaseReceiver(identifier) == OK) { GuardLock lock(controllerMutex_); --numReceivers_; return OK; } WLOG(ERROR) << "Couldn't release receiver " << identifier << " for " << wdtNamespace; return ERROR; } ErrorCode WdtResourceController::releaseAllReceivers( const std::string &wdtNamespace) { NamespaceControllerPtr controller = nullptr; { controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(WARNING) << "Couldn't find controller for " << wdtNamespace; return ERROR; } } int64_t numReceivers = controller->releaseAllReceivers(); if (numReceivers > 0) { GuardLock lock(controllerMutex_); numReceivers_ -= numReceivers; } return OK; } SenderPtr WdtResourceController::getSender(const string &wdtNamespace, const string &identifier) const { NamespaceControllerPtr controller = nullptr; controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(ERROR) << "Couldn't find the controller for " << wdtNamespace; return nullptr; } return controller->getSender(identifier); } vector<SenderPtr> WdtResourceController::getAllSenders( const string &wdtNamespace) const { NamespaceControllerPtr controller = nullptr; controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(ERROR) << "Couldn't find the controller for " << wdtNamespace; return vector<SenderPtr>(); } return controller->getSenders(); } ErrorCode WdtResourceController::releaseStaleSenders( const string &wdtNamespace, vector<string> &erasedIds) { NamespaceControllerPtr controller = nullptr; controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(ERROR) << "Couldn't find the controller for " << wdtNamespace; return NOT_FOUND; } erasedIds = controller->releaseStaleSenders(); if (erasedIds.size() > 0) { GuardLock lock(controllerMutex_); numSenders_ -= erasedIds.size(); } return OK; } ReceiverPtr WdtResourceController::getReceiver(const string &wdtNamespace, const string &identifier) const { NamespaceControllerPtr controller = nullptr; controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(ERROR) << "Couldn't find the controller for " << wdtNamespace; return nullptr; } return controller->getReceiver(identifier); } vector<ReceiverPtr> WdtResourceController::getAllReceivers( const string &wdtNamespace) const { NamespaceControllerPtr controller = nullptr; controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(ERROR) << "Couldn't find the controller for " << wdtNamespace; return vector<ReceiverPtr>(); } return controller->getReceivers(); } std::vector<std::string> WdtResourceController::getAllSendersIds( const string &wdtNamespace) const { vector<string> senderIds; NamespaceControllerPtr controller = nullptr; controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(ERROR) << "Couldn't find the controller for " << wdtNamespace; return senderIds; } return controller->getSendersIds(); } ErrorCode WdtResourceController::releaseStaleReceivers( const string &wdtNamespace, vector<string> &erasedIds) { NamespaceControllerPtr controller = nullptr; controller = getNamespaceController(wdtNamespace); if (!controller) { WLOG(ERROR) << "Couldn't find the controller for " << wdtNamespace; return NOT_FOUND; } erasedIds = controller->releaseStaleReceivers(); if (erasedIds.size() > 0) { GuardLock lock(controllerMutex_); numReceivers_ -= erasedIds.size(); } return OK; } WdtResourceController::NamespaceControllerPtr WdtResourceController::createNamespaceController( const std::string &wdtNamespace) { auto namespaceController = make_shared<WdtNamespaceController>(wdtNamespace, this); namespaceMap_[wdtNamespace] = namespaceController; return namespaceController; } ErrorCode WdtResourceController::registerWdtNamespace( const std::string &wdtNamespace) { GuardLock lock(controllerMutex_); if (getNamespaceController(wdtNamespace)) { WLOG(INFO) << "Found existing controller for " << wdtNamespace; return OK; } createNamespaceController(wdtNamespace); return OK; } ErrorCode WdtResourceController::deRegisterWdtNamespace( const std::string &wdtNamespace) { NamespaceControllerPtr controller; { GuardLock lock(controllerMutex_); auto it = namespaceMap_.find(wdtNamespace); if (it != namespaceMap_.end()) { controller = std::move(it->second); } else { WLOG(ERROR) << "Couldn't find the namespace " << wdtNamespace; return ERROR; } namespaceMap_.erase(it); } int numSenders = controller->releaseAllSenders(); int numReceivers = controller->releaseAllReceivers(); { GuardLock lock(controllerMutex_); numSenders_ -= numSenders; numReceivers_ -= numReceivers; } while (controller.use_count() > 1) { /* sleep override */ usleep(kDelTimeToSleepMillis * 1000); WLOG(INFO) << "Trying to delete the namespace " << wdtNamespace; } WLOG(INFO) << "Deleted the namespace " << wdtNamespace; return OK; } void WdtResourceController::updateMaxReceiversLimit( const std::string &wdtNamespace, int64_t maxNumReceivers) { auto controller = getNamespaceController(wdtNamespace); if (controller) { controller->updateMaxReceiversLimit(maxNumReceivers); } } void WdtResourceController::updateMaxSendersLimit( const std::string &wdtNamespace, int64_t maxNumSenders) { auto controller = getNamespaceController(wdtNamespace); if (controller) { controller->updateMaxSendersLimit(maxNumSenders); } } std::shared_ptr<Throttler> WdtResourceController::getWdtThrottler() const { return throttler_; } const WdtOptions &WdtResourceController::getOptions() const { return options_; } // TODO: consider putting strict/not strict handling logic here... shared_ptr<WdtNamespaceController> WdtResourceController::getNamespaceController( const string &wdtNamespace) const { GuardLock lock(controllerMutex_); auto it = namespaceMap_.find(wdtNamespace); if (it != namespaceMap_.end()) { return it->second; } return nullptr; } void WdtResourceController::requireRegistration(bool strict) { strictRegistration_ = strict; } } // namespace wdt } // namespace facebook