Receiver.cpp (477 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <wdt/Receiver.h>
#include <wdt/util/EncryptionUtils.h>
#include <wdt/util/ServerSocket.h>
#include <folly/lang/Bits.h>
#include <folly/Conv.h>
#include <fcntl.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <thread>
namespace facebook {
namespace wdt {
void Receiver::addCheckpoint(Checkpoint checkpoint) {
WLOG(INFO) << "Adding global checkpoint " << checkpoint.port << " "
<< checkpoint.numBlocks << " "
<< checkpoint.lastBlockReceivedBytes;
checkpoints_.emplace_back(checkpoint);
}
std::vector<Checkpoint> Receiver::getNewCheckpoints(int startIndex) {
std::vector<Checkpoint> checkpoints;
const int64_t numCheckpoints = checkpoints_.size();
for (int64_t i = startIndex; i < numCheckpoints; i++) {
checkpoints.emplace_back(checkpoints_[i]);
}
return checkpoints;
}
Receiver::Receiver(const WdtTransferRequest &transferRequest) {
WLOG(INFO) << "WDT Receiver " << Protocol::getFullVersion();
transferRequest_ = transferRequest;
}
Receiver::Receiver(int port, int numSockets, const std::string &destDir)
: Receiver(WdtTransferRequest(port, numSockets, destDir)) {
}
void Receiver::setSocketCreator(Receiver::ISocketCreator *socketCreator) {
socketCreator_ = socketCreator;
}
void Receiver::traverseDestinationDir(
std::vector<FileChunksInfo> &fileChunksInfo) {
DirectorySourceQueue dirQueue(options_, getDirectory(),
&abortCheckerCallback_);
dirQueue.buildQueueSynchronously();
auto &discoveredFilesInfo = dirQueue.getDiscoveredFilesMetaData();
for (auto &fileInfo : discoveredFilesInfo) {
if (fileInfo->relPath == kWdtLogName ||
fileInfo->relPath == kWdtBuggyLogName) {
// do not include wdt log files
WVLOG(1) << "Removing " << fileInfo->relPath
<< " from the list of existing files";
continue;
}
FileChunksInfo chunkInfo(fileInfo->seqId, fileInfo->relPath,
fileInfo->size);
chunkInfo.addChunk(Interval(0, fileInfo->size));
fileChunksInfo.emplace_back(std::move(chunkInfo));
}
return;
}
void Receiver::startNewGlobalSession(const std::string &peerIp) {
if (throttler_) {
// If throttler is configured/set then register this session
// in the throttler. This is guranteed to work in either of the
// modes long running or not. We will de register from the throttler
// when the current session ends
throttler_->startTransfer();
}
startTime_.store(Clock::now());
if (options_.enable_download_resumption) {
transferLogManager_->startThread();
bool verifySuccessful = transferLogManager_->verifySenderIp(peerIp);
if (!verifySuccessful) {
fileChunksInfo_.clear();
}
}
hasNewTransferStarted_.store(true);
WLOG(INFO) << "Starting new transfer, peerIp " << peerIp << " , transfer id "
<< getTransferId();
}
bool Receiver::hasNewTransferStarted() const {
return hasNewTransferStarted_.load();
}
void Receiver::endCurGlobalSession() {
setTransferStatus(FINISHED);
if (!hasNewTransferStarted_) {
WLOG(WARNING) << "WDT transfer did not start, no need to end session";
return;
}
WLOG(INFO) << "Ending the transfer " << getTransferId();
if (throttler_) {
throttler_->endTransfer();
}
checkpoints_.clear();
if (fileCreator_) {
fileCreator_->clearAllocationMap();
}
// TODO might consider moving closing the transfer log here
hasNewTransferStarted_.store(false);
}
const WdtTransferRequest &Receiver::init() {
if (validateTransferRequest() != OK) {
WLOG(ERROR) << "Couldn't validate the transfer request "
<< transferRequest_.getLogSafeString();
return transferRequest_;
}
transferLogManager_ =
std::make_unique<TransferLogManager>(options_, getDirectory());
checkAndUpdateBufferSize();
backlog_ = options_.backlog;
if (getTransferId().empty()) {
setTransferId(WdtBase::generateTransferId());
}
negotiateProtocol();
auto numThreads = transferRequest_.ports.size();
// This creates the destination directory (which is needed for transferLogMgr)
fileCreator_.reset(new FileCreator(
getDirectory(), numThreads, *transferLogManager_, options_.skip_writes));
transferRequest_.downloadResumptionEnabled =
options_.enable_download_resumption;
// Make sure we can get the lock on the transfer log manager early
// so if we can't we don't generate a valid but useless url and end up
// starting a sender doomed to fail
if (options_.enable_download_resumption) {
WDT_CHECK(!options_.skip_writes)
<< "Can not skip transfers with download resumption turned on";
if (options_.resume_using_dir_tree) {
WDT_CHECK(!options_.shouldPreallocateFiles())
<< "Can not resume using directory tree if preallocation is enabled";
}
ErrorCode errCode = transferLogManager_->openLog();
if (errCode != OK) {
WLOG(ERROR) << "Failed to open transfer log " << errorCodeToStr(errCode);
transferRequest_.errorCode = errCode;
return transferRequest_;
}
ErrorCode code = transferLogManager_->parseAndMatch(
recoveryId_, getTransferConfig(), fileChunksInfo_);
if (code == OK && options_.resume_using_dir_tree) {
WDT_CHECK(fileChunksInfo_.empty());
traverseDestinationDir(fileChunksInfo_);
}
}
EncryptionType encryptionType = parseEncryptionType(options_.encryption_type);
// is encryption enabled?
bool encrypt = (encryptionType != ENC_NONE &&
getProtocolVersion() >= Protocol::ENCRYPTION_V1_VERSION);
if (encrypt) {
WLOG(INFO) << encryptionTypeToStr(encryptionType)
<< " encryption is enabled for this transfer ";
if (!transferRequest_.encryptionData.isSet()) {
WLOG(INFO) << "Receiver generating encryption key for type "
<< encryptionTypeToStr(encryptionType);
transferRequest_.encryptionData =
EncryptionParams::generateEncryptionParams(encryptionType);
}
if (!transferRequest_.encryptionData.isSet()) {
WLOG(ERROR) << "Unable to generate encryption key for type "
<< encryptionTypeToStr(encryptionType);
transferRequest_.errorCode = ENCRYPTION_ERROR;
return transferRequest_;
}
} else {
if (encryptionType != ENC_NONE) {
WLOG(WARNING) << "Encryption is enabled, but protocol version is "
<< getProtocolVersion()
<< ", minimum version required for encryption is "
<< Protocol::ENCRYPTION_V1_VERSION;
}
transferRequest_.encryptionData.erase();
}
transferRequest_.ivChangeInterval = options_.iv_change_interval_mb * kMbToB;
if (options_.max_accept_retries <= 0) {
WLOG(INFO) << "Max accept retries " << options_.max_accept_retries
<< ", will accept forever";
setAcceptMode(ACCEPT_FOREVER);
}
threadsController_ = new ThreadsController(numThreads);
threadsController_->setNumFunnels(ReceiverThread::NUM_FUNNELS);
threadsController_->setNumBarriers(ReceiverThread::NUM_BARRIERS);
threadsController_->setNumConditions(ReceiverThread::NUM_CONDITIONS);
// TODO: take transferRequest directly !
receiverThreads_ = threadsController_->makeThreads<Receiver, ReceiverThread>(
this, transferRequest_.ports.size(), transferRequest_.ports);
size_t numSuccessfulInitThreads = 0;
for (auto &receiverThread : receiverThreads_) {
ErrorCode code = receiverThread->init();
if (code == OK) {
++numSuccessfulInitThreads;
}
}
WLOG(INFO) << "Registered " << numSuccessfulInitThreads
<< " successful sockets";
ErrorCode code = OK;
const size_t targetSize = transferRequest_.ports.size();
// TODO: replace with getNumPorts/thread
if (numSuccessfulInitThreads != targetSize) {
code = FEWER_PORTS;
if (numSuccessfulInitThreads == 0) {
code = ERROR;
}
}
transferRequest_.ports.clear();
for (const auto &receiverThread : receiverThreads_) {
transferRequest_.ports.push_back(receiverThread->getPort());
}
if (transferRequest_.hostName.empty()) {
char hostName[1024];
int ret = gethostname(hostName, sizeof(hostName));
if (ret == 0) {
transferRequest_.hostName.assign(hostName);
} else {
WPLOG(ERROR) << "Couldn't find the local host name";
code = ERROR;
}
}
transferRequest_.errorCode = code;
return transferRequest_;
}
TransferLogManager &Receiver::getTransferLogManager() {
return *transferLogManager_;
}
std::unique_ptr<FileCreator> &Receiver::getFileCreator() {
return fileCreator_;
}
void Receiver::setRecoveryId(const std::string &recoveryId) {
recoveryId_ = recoveryId;
WLOG(INFO) << "recovery id " << recoveryId_;
}
void Receiver::setAcceptMode(const AcceptMode acceptMode) {
std::lock_guard<std::mutex> lock(mutex_);
acceptMode_ = acceptMode;
}
Receiver::AcceptMode Receiver::getAcceptMode() {
std::lock_guard<std::mutex> lock(mutex_);
return acceptMode_;
}
Receiver::~Receiver() {
TransferStatus status = getTransferStatus();
if (status == ONGOING) {
WLOG(WARNING) << "There is an ongoing transfer and the destructor"
<< " is being called. Trying to finish the transfer";
abort(ABORTED_BY_APPLICATION);
}
finish();
}
const std::vector<FileChunksInfo> &Receiver::getFileChunksInfo() const {
return fileChunksInfo_;
}
int64_t Receiver::getTransferConfig() const {
int64_t config = 0;
if (options_.shouldPreallocateFiles()) {
config = 1;
}
if (options_.resume_using_dir_tree) {
config |= (1 << 1);
}
return config;
}
std::unique_ptr<TransferReport> Receiver::finish() {
std::unique_lock<std::mutex> instanceLock(instanceManagementMutex_);
TransferStatus status = getTransferStatus();
if (status == NOT_STARTED) {
WLOG(WARNING) << "Even though transfer has not started, finish is called";
// getTransferReport will set the error code to ERROR
return getTransferReport();
}
if (status == THREADS_JOINED) {
WLOG(WARNING) << "Threads have already been joined. Returning the "
<< "transfer report";
return getTransferReport();
}
if (!isJoinable_) {
// TODO: don't complain about this when coming from runForever()
WLOG(WARNING) << "The receiver is not joinable. The threads will never"
<< " finish and this method will never return";
}
for (auto &receiverThread : receiverThreads_) {
receiverThread->finish();
}
setTransferStatus(THREADS_JOINED);
if (isJoinable_) {
// Make sure to join the progress thread.
progressTrackerThread_.join();
}
std::unique_ptr<TransferReport> report = getTransferReport();
auto &summary = report->getSummary();
bool transferSuccess = (report->getSummary().getErrorCode() == OK);
fixAndCloseTransferLog(transferSuccess);
auto totalSenderBytes = summary.getTotalSenderBytes();
if (progressReporter_ && totalSenderBytes >= 0) {
report->setTotalFileSize(totalSenderBytes);
report->setTotalTime(durationSeconds(Clock::now() - startTime_.load()));
progressReporter_->end(report);
}
logPerfStats();
WLOG(WARNING) << "WDT receiver's transfer has been finished";
WLOG(INFO) << *report;
return report;
}
std::unique_ptr<TransferReport> Receiver::getTransferReport() {
TransferStats globalStats;
for (const auto &receiverThread : receiverThreads_) {
globalStats += receiverThread->getTransferStats();
}
std::unique_ptr<TransferReport> transferReport =
std::make_unique<TransferReport>(std::move(globalStats));
TransferStatus status = getTransferStatus();
ErrorCode errCode = transferReport->getSummary().getErrorCode();
if (status == NOT_STARTED && errCode == OK) {
WLOG(INFO) << "Transfer not started, setting the error code to ERROR";
transferReport->setErrorCode(ERROR);
}
WVLOG(1) << "Summary code " << errCode;
return transferReport;
}
ErrorCode Receiver::transferAsync() {
isJoinable_ = true;
int progressReportIntervalMillis = options_.progress_report_interval_millis;
if (!progressReporter_ && progressReportIntervalMillis > 0) {
// if progress reporter has not been set, use the default one
progressReporter_ = std::make_unique<ProgressReporter>(transferRequest_);
}
return start();
}
ErrorCode Receiver::runForever() {
WDT_CHECK(!options_.enable_download_resumption)
<< "Transfer resumption not supported in long running mode";
// Enforce the full reporting to be false in the daemon mode.
// These statistics are expensive, and useless as they will never
// be received/reviewed in a forever running process.
ErrorCode errCode = start();
if (errCode != OK) {
return errCode;
}
finish();
// This method should never finish
return ERROR;
}
void Receiver::progressTracker() {
// Progress tracker will check for progress after the time specified
// in milliseconds.
int progressReportIntervalMillis = options_.progress_report_interval_millis;
int throughputUpdateIntervalMillis =
options_.throughput_update_interval_millis;
if (progressReportIntervalMillis <= 0 || throughputUpdateIntervalMillis < 0 ||
!isJoinable_) {
return;
}
int throughputUpdateInterval =
throughputUpdateIntervalMillis / progressReportIntervalMillis;
int64_t lastEffectiveBytes = 0;
std::chrono::time_point<Clock> lastUpdateTime = Clock::now();
int intervalsSinceLastUpdate = 0;
double currentThroughput = 0;
WLOG(INFO) << "Progress reporter updating every "
<< progressReportIntervalMillis << " ms";
auto waitingTime = std::chrono::milliseconds(progressReportIntervalMillis);
int64_t totalSenderBytes = -1;
while (true) {
{
std::unique_lock<std::mutex> lock(mutex_);
conditionFinished_.wait_for(lock, waitingTime);
if (transferStatus_ == THREADS_JOINED) {
break;
}
}
double totalTime = durationSeconds(Clock::now() - startTime_.load());
TransferStats globalStats;
for (const auto &receiverThread : receiverThreads_) {
globalStats += receiverThread->getTransferStats();
}
totalSenderBytes = globalStats.getTotalSenderBytes();
// Note: totalSenderBytes may not be valid yet if sender has not
// completed file discovery. But that's ok, report whatever progress
// we can.
auto transferReport = std::make_unique<TransferReport>(
std::move(globalStats), totalTime, totalSenderBytes, 0, true);
intervalsSinceLastUpdate++;
if (intervalsSinceLastUpdate >= throughputUpdateInterval) {
auto curTime = Clock::now();
int64_t curEffectiveBytes =
transferReport->getSummary().getEffectiveDataBytes();
double time = durationSeconds(curTime - lastUpdateTime);
currentThroughput = (curEffectiveBytes - lastEffectiveBytes) / time;
lastEffectiveBytes = curEffectiveBytes;
lastUpdateTime = curTime;
intervalsSinceLastUpdate = 0;
}
transferReport->setCurrentThroughput(currentThroughput);
progressReporter_->progress(transferReport);
if (reportPerfSignal_.notified()) {
logPerfStats();
}
}
}
void Receiver::logPerfStats() const {
if (!options_.enable_perf_stat_collection) {
return;
}
PerfStatReport globalPerfReport(options_);
for (auto &receiverThread : receiverThreads_) {
globalPerfReport += receiverThread->getPerfReport();
}
WLOG(INFO) << globalPerfReport;
}
ErrorCode Receiver::start() {
WDT_CHECK_EQ(getTransferStatus(), NOT_STARTED)
<< "There is already a transfer running on this instance of receiver";
startTime_ = Clock::now();
WLOG(INFO) << "Starting (receiving) server on ports [ "
<< transferRequest_.ports << "] Target dir : " << getDirectory();
// TODO do the init stuff here
if (!throttler_) {
configureThrottler();
} else {
WLOG(INFO) << "Throttler set externally. Throttler : " << *throttler_;
}
setTransferStatus(ONGOING);
while (true) {
for (auto &receiverThread : receiverThreads_) {
receiverThread->startThread();
}
if (isJoinable_) {
break;
}
// If it is long running mode, finish the threads
// processing the current transfer and re spawn them again
// with the same sockets
for (auto &receiverThread : receiverThreads_) {
receiverThread->finish();
receiverThread->reset();
}
threadsController_->reset();
// reset transfer status
setTransferStatus(NOT_STARTED);
continue;
}
if (isJoinable_) {
if (progressReporter_) {
progressReporter_->start();
}
std::thread trackerThread(&Receiver::progressTracker, this);
progressTrackerThread_ = std::move(trackerThread);
}
return OK;
}
void Receiver::addTransferLogHeader(bool isBlockMode, bool isSenderResuming) {
if (!options_.enable_download_resumption) {
return;
}
bool invalidationEntryNeeded = false;
if (!isSenderResuming) {
WLOG(INFO) << "Sender is not in resumption mode. Invalidating directory.";
invalidationEntryNeeded = true;
} else if (options_.resume_using_dir_tree && isBlockMode) {
WLOG(INFO) << "Sender is running in block mode, but receiver is running in "
"size based resumption mode. Invalidating directory.";
invalidationEntryNeeded = true;
}
if (invalidationEntryNeeded) {
transferLogManager_->invalidateDirectory();
}
bool isInconsistentDirectory =
(transferLogManager_->getResumptionStatus() == INCONSISTENT_DIRECTORY);
bool shouldWriteHeader =
(!options_.resume_using_dir_tree || !isInconsistentDirectory);
if (shouldWriteHeader) {
transferLogManager_->writeLogHeader();
}
}
void Receiver::fixAndCloseTransferLog(bool transferSuccess) {
if (!options_.enable_download_resumption) {
return;
}
bool isInconsistentDirectory =
(transferLogManager_->getResumptionStatus() == INCONSISTENT_DIRECTORY);
bool isInvalidLog =
(transferLogManager_->getResumptionStatus() == INVALID_LOG);
if (transferSuccess && isInconsistentDirectory) {
// write log header to validate directory in case of success
WDT_CHECK(options_.resume_using_dir_tree);
transferLogManager_->writeLogHeader();
}
if (options_.enable_transfer_log_compaction && transferSuccess &&
!isInvalidLog && options_.keep_transfer_log &&
!options_.resume_using_dir_tree) {
transferLogManager_->compactLog();
} else {
WLOG(INFO) << "Skip compacting transfer log";
WVLOG(1) << "options_.enable_transfer_log_compaction="
<< options_.enable_transfer_log_compaction
<< " isInvalidLog=" << isInvalidLog
<< " options_.keep_transfer_log=" << options_.keep_transfer_log
<< " options_.resume_using_dir_tree="
<< options_.resume_using_dir_tree;
}
transferLogManager_->closeLog();
if (!transferSuccess) {
return;
}
if (isInvalidLog) {
transferLogManager_->renameBuggyLog();
}
if (!options_.keep_transfer_log) {
transferLogManager_->unlink();
}
}
}
} // namespace facebook::wdt