Wdt.h (62 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #pragma once #include <wdt/Receiver.h> #include <wdt/Sender.h> // For Options #include <wdt/WdtOptions.h> // For ErrorCode #include <wdt/ErrorCodes.h> // For IAbortChecker and WdtTransferRequest - TODO: split out ? #include <wdt/WdtBase.h> #include <wdt/WdtResourceController.h> #include <wdt/util/EncryptionUtils.h> #include <ostream> namespace facebook { namespace wdt { // Note: we use Wdt in the method names even if redundant with the class name // so we can search callers easily /** * This class is the main API and entry point for using WDT (Warp speed Data * Transfers). * * Example of use: * // During the single threaded part of your service's initialization * Wdt &wdt = Wdt::initializeWdt("app-name"); * // Optionally: change the WdtOptions as you need, for instance: * wdt.getWdtOptions().overwrite = true; * // Will use the (possibly changed above) settings, to configure wdt, * // for instance throttler options * // Sender for already setup receiver: (abortChecker is optional) * wdtSend(transferRequest, myAbortChecker); */ class Wdt { public: /** * Initialize the Wdt library and parse options from gflags. * Also initialize crypto library if needed/not already initialized. * @param: * applicationName is used at fb for scuba reporting to differentiate apps */ static Wdt &initializeWdt(const std::string &appName); /** * Mutable reference to WdtOptions */ WdtOptions &getWdtOptions(); /** * Return Wdt lib handle which has been previously initialized by calling * @see initializeWdt() * This is only needed if the caller code doesn't want to pass the Wdt * instance around. */ static Wdt &getWdt(const std::string &appName); /** * TLS is unsupported for now */ virtual bool isTlsEnabled() { return false; } /// High level APIs: /** * Send data for the shard identified by shardId to an already running/setup * receiver whose connection url was used to make a WdtTransferRequest. * Optionally passes an abort checker. */ virtual ErrorCode wdtSend( const WdtTransferRequest &wdtRequest, std::shared_ptr<IAbortChecker> abortChecker = nullptr, bool terminateExistingOne = false); virtual ErrorCode createWdtSender(const WdtTransferRequest &wdtRequest, std::shared_ptr<IAbortChecker> abortChecker, bool terminateExistingOne, SenderPtr &senderPtr); virtual ErrorCode releaseWdtSender(const WdtTransferRequest &wdtRequest); /** * Receive data. It creates a receiver on specified namespace/identifier and * initialize it. */ virtual ErrorCode wdtReceiveStart( const std::string &wdtNamespace, WdtTransferRequest &wdtRequest, const std::string &identifier = "default", std::shared_ptr<IAbortChecker> abortChecker = nullptr); /** * Finish receiving data. It finishes the receiver on specified * namespace/identifier. */ virtual ErrorCode wdtReceiveFinish(const std::string &wdtNamespace, const std::string &identifier = "default"); virtual ErrorCode printWdtOptions(std::ostream &out); WdtResourceController *getWdtResourceController() { return resourceController_.get(); } /// destroys WDT object for an app static void releaseWdt(const std::string &appName); /// @return sender identifier for a transfer request static std::string getSenderIdentifier(const WdtTransferRequest &req); virtual void wdtSetReceiverSocketCreator(Receiver &/* unused */) {} /// Virtual Destructor (for class hierarchy) virtual ~Wdt() { } protected: /// Set to true when each instance is initialized bool initDone_{false}; /// App name which is used in scuba reporting std::string appName_; WdtOptions options_; /// responsible for initializing openssl WdtCryptoIntializer cryptoInitializer_; // TODO: share resource controller across apps /// wdt resource controller std::unique_ptr<WdtResourceController> resourceController_; // Internal initialization so sub classes can share the code virtual ErrorCode initializeWdtInternal(const std::string &appName); // Optionally set socket creator and progress reporter (used for fb) virtual ErrorCode wdtSetAbortSocketCreatorAndReporter( WdtBase *target, const WdtTransferRequest &req, std::shared_ptr<IAbortChecker> abortChecker); // Internal wdt object creator/holder static Wdt &getWdtInternal(const std::string &appName, std::function<Wdt *()> factory); /// Private constructor explicit Wdt(); explicit Wdt(std::shared_ptr<Throttler> throttler); /// Not copyable Wdt(const Wdt &) = delete; Wdt &operator=(const Wdt &) = delete; }; } } // namespaces