Sender.h (83 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #pragma once #include <chrono> #include <iostream> #include <memory> #include <wdt/WdtBase.h> #include <wdt/util/ClientSocket.h> namespace facebook { namespace wdt { class SenderThread; class TransferHistoryController; enum ProtoNegotiationStatus { V_MISMATCH_WAIT, // waiting for version mismatch to be processed V_MISMATCH_RESOLVED, // version mismatch processed and was successful V_MISMATCH_FAILED, // version mismatch processed and it failed }; /** * The sender for the transfer. One instance of sender should only be * responsible for one transfer. For a second transfer you should make * another instance of the sender. * The object will not be destroyed till the transfer finishes. This * class is not thread safe. */ class Sender : public WdtBase { public: /// Creates a counter part sender for the receiver according to the details explicit Sender(const WdtTransferRequest &transferRequest); /// Setup before start (@see WdtBase.h) const WdtTransferRequest &init() override; /** * If the transfer has not finished, then it is aborted. finish() is called to * wait for threads to end. */ ~Sender() override; /** * Joins on the threads spawned by start. This has to * be explicitly called when the caller expects to conclude * a transfer. This method can be called multiple times and is thread-safe. * * @return transfer report */ std::unique_ptr<TransferReport> finish() override; /** * API to initiate a transfer and return back to the context * from where it was called. Caller would have to call finish * to get the stats for the transfer */ ErrorCode transferAsync() override; /** * A blocking call which will initiate a transfer based on * the configuration and return back the stats for the transfer * * @return transfer report */ std::unique_ptr<TransferReport> transfer(); /// End time of the transfer Clock::time_point getEndTime(); /// Get the destination sender is sending to /// @return destination host-name const std::string &getDestination() const; /// @return minimal transfer report using transfer stats of the thread std::unique_ptr<TransferReport> getTransferReport(); /// Interface to make socket class ISocketCreator { public: virtual std::unique_ptr<IClientSocket> makeClientSocket( ThreadCtx &threadCtx, const std::string &dest, const int port, const EncryptionParams &encryptionParams, int64_t ivChangeInterval, bool tls) = 0; virtual ~ISocketCreator() = default; }; /** * Sets socket creator * * @param socketCreator socket-creator to be used */ void setSocketCreator(ISocketCreator *socketCreator); private: friend class SenderThread; friend class QueueAbortChecker; friend class SenderTests; /// Validate the transfer request ErrorCode validateTransferRequest() override; /// Get the sum of all the thread transfer stats TransferStats getGlobalTransferStats() const; /// Returns true if file chunks need to be read bool isSendFileChunks() const; /// Returns true if file chunks been received by a thread bool isFileChunksReceived(); /// Sender thread calls this method to set the file chunks info received /// from the receiver void setFileChunksInfo(std::vector<FileChunksInfo> &fileChunksInfoList); /// Abort checker passed to DirectoryQueue. If all the network threads finish, /// directory discovery thread is also aborted class QueueAbortChecker : public IAbortChecker { public: explicit QueueAbortChecker(Sender *sender) : sender_(sender) { } bool shouldAbort() const override { return (sender_->getTransferStatus() == FINISHED); } private: Sender *sender_; }; /// Abort checker shared with the directory queue QueueAbortChecker queueAbortChecker_; /** * Internal API that triggers the directory thread, sets up the sender * threads and starts the transfer. Returns after the sender threads * have been spawned */ ErrorCode start(); /** * @param transferredSourceStats Stats for the successfully transmitted * sources * @param failedSourceStats Stats for the failed sources */ void validateTransferStats( const std::vector<TransferStats> &transferredSourceStats, const std::vector<TransferStats> &failedSourceStats); /** * Responsible for doing a periodic check. * 1. Takes a lock on the thread stats to make a summary * 2. Sends the progress report with the summary to the progress reporter * which can be provided by the user */ void reportProgress(); void logPerfStats() const override; /// Get the files from fileInfoGenerator if it's configured in transferRequest /// Note: This call may block on DirectorySourceQueue::waitForPreviousTransfer /// /// @return list of files or folly::none indicating no more files folly::Optional<std::vector<WdtFileInfo>> getFilesFromFileInfoGenerator(); /// Pointer to DirectorySourceQueue which reads the srcDir and the files std::unique_ptr<DirectorySourceQueue> dirQueue_; /// Number of active threads, decremented every time a thread is finished int32_t numActiveThreads_{0}; /// The interval at which the progress reporter should check for progress int progressReportIntervalMillis_{0}; /// Socket creator used to optionally create different kinds of client socket ISocketCreator *socketCreator_{nullptr}; /// Whether download resumption is enabled or not bool downloadResumptionEnabled_{false}; /// Flags representing whether file chunks have been received or not bool fileChunksReceived_{false}; /// Thread that is running the discovery of files using the dirQueue_ std::thread dirThread_; /// Threads which are responsible for transfer of the sources std::vector<std::unique_ptr<WdtThread>> senderThreads_; /// Thread responsible for doing the progress checks. Uses reportProgress() std::thread progressReporterThread_; /// Returns the protocol negotiation status of the parent sender ProtoNegotiationStatus getNegotiationStatus(); /// Set the protocol negotiation status, called by sender thread void setProtoNegotiationStatus(ProtoNegotiationStatus status); /// Things to do before ending the current transfer void endCurTransfer(); /// Initializing the new transfer void startNewTransfer(); /// Returns vector of negotiated protocols set by sender threads std::vector<int> getNegotiatedProtocols() const; /// Protocol negotiation status, used to co-ordinate processing of version /// mismatch. Threads aborted due to version mismatch waits for all threads to /// abort and reach PROCESS_VERSION_MISMATCH state. Last thread processes /// version mismatch and changes this status variable. Other threads check /// this variable to decide when to proceed. ProtoNegotiationStatus protoNegotiationStatus_{V_MISMATCH_WAIT}; /// Time at which the transfer was started std::chrono::time_point<Clock> startTime_; /// Time at which the transfer finished std::chrono::time_point<Clock> endTime_; /// Transfer history controller for the sender threads std::unique_ptr<TransferHistoryController> transferHistoryController_; }; } } // namespace facebook::wdt