Receiver.h (72 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/ReceiverThread.h>
#include <wdt/WdtBase.h>
#include <wdt/util/FileCreator.h>
#include <wdt/util/IServerSocket.h>
#include <wdt/util/TransferLogManager.h>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
namespace facebook {
namespace wdt {
/**
* Receiver is the receiving side of the transfer. Receiver listens on ports
* accepts connections, receives the files and writes to the destination
* directory. Receiver has two modes of operation : You can spawn a receiver
* for one transfer or alternatively it can also be used in a long running
* mode where it accepts subsequent transfers and runs in an infinite loop.
*/
class Receiver : public WdtBase {
public:
/// Constructor using wdt transfer request (@see in WdtBase.h)
explicit Receiver(const WdtTransferRequest &transferRequest);
/**
* Constructor with start port, number of ports and directory to write to.
* If the start port is specified as zero, it auto configures the ports
*/
Receiver(int port, int numSockets, const std::string &destDir);
/// Setup before starting (@see WdtBase.h)
const WdtTransferRequest &init() override;
/**
* Joins on the threads spawned by start. This method
* is called by default when the wdt receiver is expected
* to run as forever running process. However this has to
* be explicitly called when the caller expects to conclude
* a transfer.
*/
std::unique_ptr<TransferReport> finish() override;
/**
* Call this method instead of transferAsync() when you don't
* want the wdt receiver to stop after one transfer.
*/
ErrorCode runForever();
/**
* Starts the threads, and returns. Caller should call finish() after
* calling this method to get the statistics of the transfer.
*/
ErrorCode transferAsync() override;
/// @param recoveryId unique-id used to verify transfer log
void setRecoveryId(const std::string &recoveryId);
/// Returns true if at least one thread has accepted connection
bool hasNewTransferStarted() const;
// Different accept modes for the Receiver
enum AcceptMode {
ACCEPT_WITH_RETRIES, // Receiver gives up after max_accept_retries
ACCEPT_FOREVER, // Receiver never gives up
STOP_ACCEPTING, // Receiver stops accepting
};
/// @param acceptMode acceptMode to use
void setAcceptMode(AcceptMode acceptMode);
/// Interface to make socket
class ISocketCreator {
public:
virtual std::unique_ptr<IServerSocket> makeServerSocket(
ThreadCtx &threadCtx, int port, int backlog,
const EncryptionParams &encryptionParams, int64_t ivChangeInterval,
Func &&tagVerificationSuccessCallback, bool tls) = 0;
virtual ~ISocketCreator() = default;
};
/**
* Sets socket creator
*
* @param socketCreator socket-creator to be used
*/
void setSocketCreator(ISocketCreator *socketCreator);
/**
* Destructor for the receiver. The destructor automatically cancels
* any incomplete transfers that are going on. 'Incomplete transfer' is a
* transfer where there is no receiver thread that has received
* confirmation from wdt sender that the transfer is 'DONE'. Destructor also
* internally calls finish() for every transfer if finish() wasn't called
*/
~Receiver() override;
protected:
friend class ReceiverThread;
/**
* Traverses root directory and returns discovered file information
*
* @param fileChunksInfo discovered file info
*/
void traverseDestinationDir(std::vector<FileChunksInfo> &fileChunksInfo);
/// Get the transferred file chunks info
const std::vector<FileChunksInfo> &getFileChunksInfo() const;
/// Get file creator, used by receiver threads
std::unique_ptr<FileCreator> &getFileCreator();
/// Socket creator used to optionally create different kind of server socket
ISocketCreator *socketCreator_{nullptr};
/// Get the ref to transfer log manager
TransferLogManager &getTransferLogManager();
/// Responsible for basic setup and starting threads
ErrorCode start();
/**
* Periodically calculates current transfer report and send it to progress
* reporter. This only works in the single transfer mode.
*/
void progressTracker();
/**
* Adds a checkpoint to the global checkpoint list
* @param checkpoint checkpoint to be added
*/
void addCheckpoint(Checkpoint checkpoint);
/**
* @param startIndex number of checkpoints already transferred by the
* calling thread
* @return list of new checkpoints
*/
std::vector<Checkpoint> getNewCheckpoints(int startIndex);
/// Does the steps needed before a new transfer is started
void startNewGlobalSession(const std::string &peerIp);
/// Has steps to do when the current transfer is ended
void endCurGlobalSession();
/// adds log header and also a directory invalidation entry if needed
void addTransferLogHeader(bool isBlockMode, bool isSenderResuming);
/// fix, compact (if enabled) and close transfer log
void fixAndCloseTransferLog(bool transferSuccess);
/**
* Get transfer report, meant to be called after threads have been finished
* This method is not thread safe
*/
std::unique_ptr<TransferReport> getTransferReport();
void logPerfStats() const override;
/// @return transfer config encoded as int
int64_t getTransferConfig() const;
AcceptMode getAcceptMode();
/// The thread that is responsible for calling running the progress tracker
std::thread progressTrackerThread_;
/// Flag based on which threads finish processing on receiving a done
bool isJoinable_{false};
/// Responsible for writing files on the disk
std::unique_ptr<FileCreator> fileCreator_{nullptr};
/**
* Unique-id used to verify transfer log. This value must be same for
* transfers across resumption
*/
std::string recoveryId_;
/**
* The instance of the receiver threads are stored in this vector.
* This will not be destroyed until this object is destroyed, hence
* it has to be made sure that these threads are joined at least before
* the destruction of this object.
*/
std::vector<std::unique_ptr<WdtThread>> receiverThreads_;
/// Transfer log manager
std::unique_ptr<TransferLogManager> transferLogManager_;
/// Global list of checkpoints
std::vector<Checkpoint> checkpoints_;
/// Start time of the session
std::atomic<std::chrono::time_point<Clock>> startTime_;
/// already transferred file chunks
std::vector<FileChunksInfo> fileChunksInfo_;
/// Marks when a new transfer has started
std::atomic<bool> hasNewTransferStarted_{false};
/// Backlog used by the sockets
int backlog_;
AcceptMode acceptMode_{ACCEPT_WITH_RETRIES};
};
}
} // namespace facebook::wdt