WdtResourceController.h (123 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/ErrorCodes.h>
#include <wdt/Receiver.h>
#include <wdt/Sender.h>
#include <unordered_map>
#include <vector>
namespace facebook {
namespace wdt {
typedef std::shared_ptr<Receiver> ReceiverPtr;
typedef std::shared_ptr<Sender> SenderPtr;
/**
* Base class for both wdt global and namespace controller
*/
class WdtControllerBase {
public:
/// Constructor with a name for the controller
explicit WdtControllerBase(const std::string &controllerName);
/// Destructor
virtual ~WdtControllerBase() {
}
/// Update max receivers limit
virtual void updateMaxReceiversLimit(int64_t maxNumReceivers);
/// Update max senders limit
virtual void updateMaxSendersLimit(int64_t maxNumSenders);
protected:
using GuardLock = std::unique_lock<std::recursive_mutex>;
/// Number of active receivers
int64_t numReceivers_{0};
/// Number of active senders
int64_t numSenders_{0};
/// Maximum number of senders allowed for this namespace
int64_t maxNumSenders_{0};
/// Maximum number of receivers allowed for this namespace
int64_t maxNumReceivers_{0};
/// Mutex that protects all the private members of this class
mutable std::recursive_mutex controllerMutex_;
/// Name of the resource controller
std::string controllerName_;
};
class WdtResourceController;
/**
* Controller defined per namespace if the user wants to divide
* things between different namespaces (ex db shards)
*/
class WdtNamespaceController : public WdtControllerBase {
public:
/// Constructor with a name for namespace
WdtNamespaceController(const std::string &wdtNamespace,
const WdtResourceController *const parent);
/// Is free to create sender.
bool hasSenderQuota() const;
/// Add a receiver for this namespace with identifier
ErrorCode createReceiver(const WdtTransferRequest &request,
const std::string &identifier,
ReceiverPtr &receiver);
/// Is free to create receiver.
bool hasReceiverQuota() const;
/// Add a sender for this namespace with identifier
ErrorCode createSender(const WdtTransferRequest &request,
const std::string &identifier, SenderPtr &sender);
/// Delete a receiver from this namespace
ErrorCode releaseReceiver(const std::string &identifier);
/// Delete a sender from this namespace
ErrorCode releaseSender(const std::string &identifier);
/// Releases all senders in this namespace
int64_t releaseAllSenders();
/// Releases all receivers in this namespace
int64_t releaseAllReceivers();
/**
* Get the sender you created by the createSender API
* using the same identifier you mentioned before
*/
SenderPtr getSender(const std::string &identifier) const;
/**
* Get the receiver you created by the createReceiver API
* using the same identifier you mentioned before
*/
ReceiverPtr getReceiver(const std::string &identifier) const;
/// Get all senders
std::vector<SenderPtr> getSenders() const;
/// Get all receivers
std::vector<ReceiverPtr> getReceivers() const;
// Get all senders ids
std::vector<std::string> getSendersIds() const;
/// Clear the senders that are not active anymore
std::vector<std::string> releaseStaleSenders();
/// Clear the receivers that are not active anymore
std::vector<std::string> releaseStaleReceivers();
/// Destructor, clears the senders and receivers
~WdtNamespaceController() override;
private:
/// Map of receivers associated with identifier
std::unordered_map<std::string, ReceiverPtr> receiversMap_;
/// Map of senders associated with identifier
std::unordered_map<std::string, SenderPtr> sendersMap_;
/// Throttler for this namespace
const WdtResourceController *const parent_;
};
/**
* A generic resource controller for wdt objects
* User can set the maximum limit for receiver/sender
* and organize them in different namespace
*/
class WdtResourceController : public WdtControllerBase {
public:
/// resource controller should take the option as reference so that it can be
/// changed later from the parent object
WdtResourceController(const WdtOptions &options,
std::shared_ptr<Throttler> throttler);
explicit WdtResourceController(const WdtOptions &options);
WdtResourceController();
/// Is free to create sender specified by namespace.
bool hasSenderQuota(const std::string &wdtNamespace) const;
/**
* Add a sender specified by namespace and a identifier.
* You can get this sender back by using the same identifier
*/
ErrorCode createSender(const std::string &wdtNamespace,
const std::string &identifier,
const WdtTransferRequest &request, SenderPtr &sender);
/// Is free to create receiver specified by namespace.
bool hasReceiverQuota(const std::string &wdtNamespace) const;
/// Add a receiver specified with namespace and identifier
ErrorCode createReceiver(const std::string &wdtNamespace,
const std::string &identifier,
const WdtTransferRequest &request,
ReceiverPtr &receiver);
/// Release a sender specified with namespace and identifier
ErrorCode releaseSender(const std::string &wdtNamespace,
const std::string &identifier);
/// Release a receiver specified with namespace and identifier
ErrorCode releaseReceiver(const std::string &wdtNamespace,
const std::string &identifier);
/// Register a wdt namespace (if strict mode)
ErrorCode registerWdtNamespace(const std::string &wdtNamespace);
/// De register a wdt namespace
ErrorCode deRegisterWdtNamespace(const std::string &wdtNamespace);
/// Use the base class methods for global limits
using WdtControllerBase::updateMaxReceiversLimit;
using WdtControllerBase::updateMaxSendersLimit;
/// Update max receivers limit of namespace
void updateMaxReceiversLimit(const std::string &wdtNamespace,
int64_t maxNumReceivers);
/// Update max senders limit of namespace
void updateMaxSendersLimit(const std::string &wdtNamespace,
int64_t maxNumSenders);
/// Release all senders in the specified namespace
ErrorCode releaseAllSenders(const std::string &wdtNamespace);
/// Releases all receivers in specified namespace
ErrorCode releaseAllReceivers(const std::string &wdtNamespace);
/// Get a particular sender from a wdt namespace
SenderPtr getSender(const std::string &wdtNamespace,
const std::string &identifier) const;
/// Get a particular receiver from a wdt namespace
ReceiverPtr getReceiver(const std::string &wdtNamespace,
const std::string &identifier) const;
/// Get all senders in a namespace
std::vector<SenderPtr> getAllSenders(const std::string &wdtNamespace) const;
/// Get all senders ids in a namespace
std::vector<std::string> getAllSendersIds(
const std::string &wdtNamespace) const;
/// Get all receivers in a namespace
std::vector<ReceiverPtr> getAllReceivers(
const std::string &wdtNamespace) const;
/// Clear the senders that are no longer active.
ErrorCode releaseStaleSenders(const std::string &wdtNamespace,
std::vector<std::string> &erasedIds);
/// Clear the receivers that are no longer active
ErrorCode releaseStaleReceivers(const std::string &wdtNamespace,
std::vector<std::string> &erasedIds);
/**
* Call with true to require registerWdtNameSpace() to be called
* before requesting sender/receiver for that namespace.
*/
void requireRegistration(bool isStrict);
/// Cleanly shuts down the controller
void shutdown();
/// @return Singleton instance of the controller
static WdtResourceController *get();
/// Destructor for the global resource controller
~WdtResourceController() override;
/// Default global namespace
static const char *const kGlobalNamespace;
/// Return current counts
ErrorCode getCounts(int32_t &numNamespaces, int32_t &numSenders,
int32_t &numReceivers);
/**
* getter for throttler.
* setThrottlerRates to this throttler may not take effect. Instead, update
* WdtOptions accordingly.
* Applications have to register transfers with the throttler, and at the end
* de-register it. For example-
* throttler->startTransfer();
* ...
* throttler->limit(numBytes);
* ...
* throttler->endTransfer();
*/
std::shared_ptr<Throttler> getWdtThrottler() const;
const WdtOptions &getOptions() const;
protected:
typedef std::shared_ptr<WdtNamespaceController> NamespaceControllerPtr;
/// Get the namespace controller
NamespaceControllerPtr getNamespaceController(
const std::string &wdtNamespace) const;
private:
NamespaceControllerPtr createNamespaceController(const std::string &name);
/// Map containing the resource controller per namespace
std::unordered_map<std::string, NamespaceControllerPtr> namespaceMap_;
/// Whether namespace need to be created explictly
bool strictRegistration_{false};
/// Throttler for all the namespaces
std::shared_ptr<Throttler> throttler_{nullptr};
const WdtOptions &options_;
/// Internal method for checking hasSenderQuota & hasReceiverQuota
bool hasSenderQuotaInternal(const std::shared_ptr<WdtNamespaceController>
&controller = nullptr) const;
bool hasReceiverQuotaInternal(const std::shared_ptr<WdtNamespaceController>
&controller = nullptr) const;
};
} // namespace wdt
} // namespace facebook