WdtBase.h (87 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #pragma once #include <folly/synchronization/RWSpinLock.h> #include <wdt/AbortChecker.h> #include <wdt/ErrorCodes.h> #include <wdt/Protocol.h> #include <wdt/Reporting.h> #include <wdt/Throttler.h> #include <wdt/WdtOptions.h> #include <wdt/WdtThread.h> #include <wdt/util/DirectorySourceQueue.h> #include <wdt/util/EncryptionUtils.h> #include <wdt/util/ThreadsController.h> #include <memory> #include <string> #include <unordered_map> #include <vector> namespace facebook { namespace wdt { /** * Shared code/functionality between Receiver and Sender * TODO: check if more of Receiver/Sender should move here */ class WdtBase { public: /// Constructor WdtBase(); /** * Does the setup before start, returns the transfer request * that corresponds to the information relating to the sender * The transfer request has error code set should there be an error */ virtual const WdtTransferRequest& init() = 0; /// Sets other options than global/singleton ones - call this before init() void setWdtOptions(const WdtOptions& src); /// Destructor virtual ~WdtBase(); /// Transfer can be marked to abort and threads will eventually /// get aborted after this method has been called based on /// whether they are doing read/write on the socket and the timeout for the /// socket. Push mode for abort. void abort(ErrorCode abortCode); /// clears abort flag void clearAbort(); /** * Returns a reference to the copy of WdtOptions held by this object. * Changes should only be made before init() is called, not after. */ WdtOptions& getWdtOptions() { return options_; } /** * sets an extra external call back to check for abort * can be for instance extending IAbortChecker with * bool checkAbort() {return atomicBool->load();} * see wdtCmdLine.cpp for an example. */ void setAbortChecker(const std::shared_ptr<IAbortChecker>& checker); /// threads can call this method to find out /// whether transfer has been marked from abort ErrorCode getCurAbortCode() const; /// Wdt objects can report progress. Setter for progress reporter /// defined in Reporting.h void setProgressReporter(std::unique_ptr<ProgressReporter>& progressReporter); /// Set throttler externally. Should be set before any transfer calls void setThrottler(std::shared_ptr<Throttler> throttler); /// Sets the transferId for this transfer void setTransferId(const std::string& transferId); /// Get the protocol version of the transfer int getProtocolVersion() const; /// Sets protocol version to use void setProtocolVersion(int protocolVersion); /// Get the transfer id of the object std::string getTransferId(); /// Get the transfer request WdtTransferRequest& getTransferRequest(); /// Finishes the wdt object and returns a report virtual std::unique_ptr<TransferReport> finish() = 0; /// Method to transfer the data. Doesn't block and /// returns with the status virtual ErrorCode transferAsync() = 0; /// Basic setup for throttler using options void configureThrottler(); /// Utility to generate a random transfer id static std::string generateTransferId(); /// Get the throttler std::shared_ptr<Throttler> getThrottler() const; /// @return Root directory const std::string& getDirectory() const; /// @param whether the object is stale. If all the transferring threads /// have finished, the object will marked as stale bool isStale(); /// @return Whether the transfer has started bool hasStarted(); /// abort checker class passed to socket functions class AbortChecker : public IAbortChecker { public: explicit AbortChecker(WdtBase* wdtBase) : wdtBase_(wdtBase) { } bool shouldAbort() const override { return wdtBase_->getCurAbortCode() != OK; } private: WdtBase* wdtBase_; }; protected: enum TransferStatus { NOT_STARTED, // threads not started ONGOING, // transfer is ongoing FINISHED, // last running thread finished THREADS_JOINED, // threads joined }; /// Validate the transfer request virtual ErrorCode validateTransferRequest(); /// @return current transfer status TransferStatus getTransferStatus(); /// corrects buffer size if necessary void checkAndUpdateBufferSize(); /// @param transferStatus current transfer status void setTransferStatus(TransferStatus transferStatus); /// Sets the protocol version for the transfer void negotiateProtocol(); /// Dumps performance statistics if enable_perf_stat_collection is true. virtual void logPerfStats() const = 0; /// Input/output transfer request WdtTransferRequest transferRequest_; /// Global throttler across all threads std::shared_ptr<Throttler> throttler_; /// Holds the instance of the progress reporter default or customized std::unique_ptr<ProgressReporter> progressReporter_; /// abort checker passed to socket functions AbortChecker abortCheckerCallback_; /// current transfer status TransferStatus transferStatus_{NOT_STARTED}; /// Mutex which is shared between the parent thread, transferring threads and /// progress reporter thread std::mutex mutex_; /// Mutex for the management of this instance, specifically to keep the /// instance sane for multi threaded public API calls std::mutex instanceManagementMutex_; /// This condition is notified when the transfer is finished std::condition_variable conditionFinished_; /// Controller for wdt threads shared between base and threads ThreadsController* threadsController_{nullptr}; /// Dump perf stats if notified ReportPerfSignalSubscriber reportPerfSignal_; /// Options/config used by this object WdtOptions options_; private: mutable folly::RWSpinLock abortCodeLock_; /// Internal and default abort code ErrorCode abortCode_{OK}; /// Additional external source of check for abort requested std::shared_ptr<IAbortChecker> abortChecker_{nullptr}; }; } } // namespace facebook::wdt