Sender.cpp (417 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <wdt/Sender.h>
#include <wdt/SenderThread.h>
#include <wdt/Throttler.h>
#include <wdt/util/ClientSocket.h>
#include <folly/lang/Bits.h>
#include <folly/hash/Checksum.h>
#include <folly/Conv.h>
#include <folly/Memory.h>
#include <folly/ScopeGuard.h>
#include <folly/String.h>
namespace facebook {
namespace wdt {
void Sender::endCurTransfer() {
endTime_ = Clock::now();
WLOG(INFO) << "Last thread finished "
<< durationSeconds(endTime_ - startTime_) << " for transfer id "
<< getTransferId();
setTransferStatus(FINISHED);
if (throttler_) {
throttler_->endTransfer();
}
}
void Sender::startNewTransfer() {
if (throttler_) {
throttler_->startTransfer();
}
WLOG(INFO) << "Starting a new transfer " << getTransferId() << " to "
<< transferRequest_.hostName;
}
Sender::Sender(const WdtTransferRequest &transferRequest)
: queueAbortChecker_(this) {
WLOG(INFO) << "WDT Sender " << Protocol::getFullVersion();
transferRequest_ = transferRequest;
if (getTransferId().empty()) {
WLOG(WARNING) << "Sender without transferId... will likely fail to connect";
}
}
ErrorCode Sender::validateTransferRequest() {
ErrorCode code = WdtBase::validateTransferRequest();
// If the request is still valid check for other
// sender specific validations
if (code == OK && transferRequest_.hostName.empty()) {
WLOG(ERROR) << "Transfer request validation failed for wdt sender "
<< transferRequest_.getLogSafeString();
code = INVALID_REQUEST;
}
transferRequest_.errorCode = code;
return code;
}
const WdtTransferRequest &Sender::init() {
WVLOG(1) << "Sender Init() with encryption set = "
<< transferRequest_.encryptionData.isSet();
negotiateProtocol();
if (validateTransferRequest() != OK) {
WLOG(ERROR) << "Couldn't validate the transfer request "
<< transferRequest_.getLogSafeString();
return transferRequest_;
}
// TODO Figure out what to do with file info
// transferRequest.fileInfo = dirQueue_->getFileInfo();
transferRequest_.errorCode = OK;
bool encrypt = transferRequest_.encryptionData.isSet();
WLOG_IF(INFO, encrypt) << "Encryption is enabled for this transfer";
return transferRequest_;
}
Sender::~Sender() {
TransferStatus status = getTransferStatus();
if (status == ONGOING) {
WLOG(WARNING) << "Sender being deleted. Forcefully aborting the transfer";
abort(ABORTED_BY_APPLICATION);
}
finish();
}
ProtoNegotiationStatus Sender::getNegotiationStatus() {
return protoNegotiationStatus_;
}
std::vector<int> Sender::getNegotiatedProtocols() const {
std::vector<int> ret;
for (const auto &senderThread : senderThreads_) {
ret.push_back(senderThread->getNegotiatedProtocol());
}
return ret;
}
void Sender::setProtoNegotiationStatus(ProtoNegotiationStatus status) {
protoNegotiationStatus_ = status;
}
bool Sender::isSendFileChunks() const {
return (downloadResumptionEnabled_ &&
getProtocolVersion() >= Protocol::DOWNLOAD_RESUMPTION_VERSION);
}
bool Sender::isFileChunksReceived() {
std::lock_guard<std::mutex> lock(mutex_);
return fileChunksReceived_;
}
void Sender::setFileChunksInfo(
std::vector<FileChunksInfo> &fileChunksInfoList) {
std::lock_guard<std::mutex> lock(mutex_);
if (fileChunksReceived_) {
WLOG(WARNING) << "File chunks list received multiple times";
return;
}
dirQueue_->setPreviouslyReceivedChunks(fileChunksInfoList);
fileChunksReceived_ = true;
}
const std::string &Sender::getDestination() const {
return transferRequest_.hostName;
}
std::unique_ptr<TransferReport> Sender::getTransferReport() {
int64_t totalFileSize = 0;
int64_t fileCount = 0;
bool fileDiscoveryFinished = false;
if (dirQueue_ != nullptr) {
totalFileSize = dirQueue_->getTotalSize();
fileCount = dirQueue_->getCount();
fileDiscoveryFinished = dirQueue_->fileDiscoveryFinished();
}
double totalTime = durationSeconds(Clock::now() - startTime_);
auto globalStats = getGlobalTransferStats();
std::unique_ptr<TransferReport> transferReport =
std::make_unique<TransferReport>(std::move(globalStats), totalTime,
totalFileSize, fileCount,
fileDiscoveryFinished);
TransferStatus status = getTransferStatus();
ErrorCode errCode = transferReport->getSummary().getErrorCode();
if (status == NOT_STARTED && errCode == OK) {
WLOG(INFO) << "Transfer not started, setting the error code to ERROR";
transferReport->setErrorCode(ERROR);
}
return transferReport;
}
Clock::time_point Sender::getEndTime() {
return endTime_;
}
TransferStats Sender::getGlobalTransferStats() const {
TransferStats globalStats;
for (const auto &thread : senderThreads_) {
globalStats += thread->getTransferStats();
}
return globalStats;
}
std::unique_ptr<TransferReport> Sender::finish() {
std::unique_lock<std::mutex> instanceLock(instanceManagementMutex_);
WVLOG(1) << "Sender::finish()";
TransferStatus status = getTransferStatus();
if (status == NOT_STARTED) {
WLOG(WARNING) << "Even though transfer has not started, finish is called";
// getTransferReport will set the error code to ERROR
return getTransferReport();
}
if (status == THREADS_JOINED) {
WVLOG(1) << "Threads have already been joined. Returning the"
<< " existing transfer report";
return getTransferReport();
}
const bool twoPhases = options_.two_phases;
bool progressReportEnabled =
progressReporter_ && progressReportIntervalMillis_ > 0;
for (auto &senderThread : senderThreads_) {
senderThread->finish();
}
if (!twoPhases) {
dirThread_.join();
}
WDT_CHECK(numActiveThreads_ == 0);
setTransferStatus(THREADS_JOINED);
if (progressReportEnabled) {
progressReporterThread_.join();
}
std::vector<TransferStats> threadStats;
for (auto &senderThread : senderThreads_) {
threadStats.push_back(senderThread->moveStats());
}
bool allSourcesAcked = false;
for (auto &senderThread : senderThreads_) {
auto &stats = senderThread->getTransferStats();
if (stats.getErrorCode() == OK) {
// at least one thread finished correctly
// that means all transferred sources are acked
allSourcesAcked = true;
break;
}
}
std::vector<TransferStats> transferredSourceStats;
for (auto port : transferRequest_.ports) {
auto &transferHistory =
transferHistoryController_->getTransferHistory(port);
if (allSourcesAcked) {
transferHistory.markAllAcknowledged();
} else {
transferHistory.returnUnackedSourcesToQueue();
}
if (options_.full_reporting) {
std::vector<TransferStats> stats = transferHistory.popAckedSourceStats();
transferredSourceStats.insert(transferredSourceStats.end(),
std::make_move_iterator(stats.begin()),
std::make_move_iterator(stats.end()));
}
}
if (options_.full_reporting) {
validateTransferStats(transferredSourceStats,
dirQueue_->getFailedSourceStats());
}
int64_t totalFileSize = dirQueue_->getTotalSize();
double totalTime = durationSeconds(endTime_ - startTime_);
std::unique_ptr<TransferReport> transferReport =
std::make_unique<TransferReport>(
transferredSourceStats, dirQueue_->getFailedSourceStats(),
threadStats, dirQueue_->getFailedDirectories(), totalTime,
totalFileSize, dirQueue_->getCount(),
dirQueue_->getPreviouslySentBytes(),
dirQueue_->fileDiscoveryFinished());
if (progressReportEnabled) {
progressReporter_->end(transferReport);
}
logPerfStats();
double directoryTime;
directoryTime = dirQueue_->getDirectoryTime();
WLOG(INFO) << "Total sender time = " << totalTime << " seconds ("
<< directoryTime << " dirTime)"
<< ". Transfer summary : " << *transferReport << "\n"
<< WDT_LOG_PREFIX << "Total sender throughput = "
<< transferReport->getThroughputMBps() << " Mbytes/sec ("
<< transferReport->getSummary().getEffectiveTotalBytes() /
(totalTime - directoryTime) / kMbToB
<< " Mbytes/sec pure transfer rate)";
return transferReport;
}
ErrorCode Sender::transferAsync() {
return start();
}
std::unique_ptr<TransferReport> Sender::transfer() {
start();
return finish();
}
folly::Optional<std::vector<WdtFileInfo>>
Sender::getFilesFromFileInfoGenerator() {
auto runningThreads = [this]() {
return threadsController_->numRunningThreads();
};
dirQueue_->waitForPreviousTransfer(
std::chrono::milliseconds(options_.progress_report_interval_millis),
runningThreads);
const auto status = getTransferStatus();
if (status != ONGOING) {
WLOG(INFO) << "Terminating transafer since status isn't ONGOING. "
<< "Status: " << status;
return folly::none;
}
return transferRequest_.fileInfoGenerator();
}
ErrorCode Sender::start() {
{
std::lock_guard<std::mutex> lock(mutex_);
if (transferStatus_ != NOT_STARTED) {
WLOG(ERROR) << "duplicate start() call detected " << transferStatus_;
return ALREADY_EXISTS;
}
transferStatus_ = ONGOING;
}
// set up directory queue
dirQueue_.reset(new DirectorySourceQueue(options_, transferRequest_.directory,
&queueAbortChecker_));
WVLOG(3) << "Configuring the directory queue";
dirQueue_->setIncludePattern(options_.include_regex);
dirQueue_->setExcludePattern(options_.exclude_regex);
dirQueue_->setPruneDirPattern(options_.prune_dir_regex);
dirQueue_->setFollowSymlinks(options_.follow_symlinks);
dirQueue_->setBlockSizeMbytes(options_.block_size_mbytes);
dirQueue_->setNumClientThreads(transferRequest_.ports.size());
dirQueue_->setOpenFilesDuringDiscovery(options_.open_files_during_discovery);
dirQueue_->setDirectReads(options_.odirect_reads);
if (!transferRequest_.fileInfo.empty() ||
transferRequest_.fileInfoGenerator ||
transferRequest_.disableDirectoryTraversal) {
dirQueue_->setFileInfo(transferRequest_.fileInfo);
if (transferRequest_.fileInfoGenerator) {
dirQueue_->setFileInfoGenerator(
[this]() { return getFilesFromFileInfoGenerator(); });
}
}
transferHistoryController_ =
std::make_unique<TransferHistoryController>(*dirQueue_);
checkAndUpdateBufferSize();
const bool twoPhases = options_.two_phases;
WLOG(INFO) << "Client (sending) to " << getDestination() << ", Using ports [ "
<< transferRequest_.ports << "]";
startTime_ = Clock::now();
progressReportIntervalMillis_ = options_.progress_report_interval_millis;
downloadResumptionEnabled_ = (transferRequest_.downloadResumptionEnabled ||
options_.enable_download_resumption);
bool deleteExtraFiles = (transferRequest_.downloadResumptionEnabled ||
options_.delete_extra_files);
if (!progressReporter_) {
WVLOG(1) << "No progress reporter provided, making a default one";
progressReporter_ = std::make_unique<ProgressReporter>(transferRequest_);
}
bool progressReportEnabled =
progressReporter_ && progressReportIntervalMillis_ > 0;
if (throttler_) {
WLOG(INFO) << "Skipping throttler setup. External throttler set."
<< "Throttler details : " << *throttler_;
} else {
configureThrottler();
}
threadsController_ = new ThreadsController(transferRequest_.ports.size());
threadsController_->setNumBarriers(SenderThread::NUM_BARRIERS);
threadsController_->setNumFunnels(SenderThread::NUM_FUNNELS);
threadsController_->setNumConditions(SenderThread::NUM_CONDITIONS);
// TODO: fix this ! use transferRequest! (and dup from Receiver)
senderThreads_ = threadsController_->makeThreads<Sender, SenderThread>(
this, transferRequest_.ports.size(), transferRequest_.ports);
if (downloadResumptionEnabled_ && deleteExtraFiles) {
if (getProtocolVersion() >= Protocol::DELETE_CMD_VERSION) {
dirQueue_->enableFileDeletion();
} else {
WLOG(WARNING) << "Turning off extra file deletion on the receiver side "
"because of protocol version "
<< getProtocolVersion();
}
}
dirThread_ = dirQueue_->buildQueueAsynchronously();
if (twoPhases) {
dirThread_.join();
}
for (auto &senderThread : senderThreads_) {
senderThread->startThread();
}
if (progressReportEnabled) {
progressReporter_->start();
std::thread reporterThread(&Sender::reportProgress, this);
progressReporterThread_ = std::move(reporterThread);
}
return OK;
}
void Sender::validateTransferStats(
const std::vector<TransferStats> &transferredSourceStats,
const std::vector<TransferStats> &failedSourceStats) {
int64_t sourceFailedAttempts = 0;
int64_t sourceDataBytes = 0;
int64_t sourceEffectiveDataBytes = 0;
int64_t sourceNumBlocks = 0;
int64_t threadFailedAttempts = 0;
int64_t threadDataBytes = 0;
int64_t threadEffectiveDataBytes = 0;
int64_t threadNumBlocks = 0;
for (const auto &stat : transferredSourceStats) {
sourceFailedAttempts += stat.getFailedAttempts();
sourceDataBytes += stat.getDataBytes();
sourceEffectiveDataBytes += stat.getEffectiveDataBytes();
sourceNumBlocks += stat.getNumBlocks();
}
for (const auto &stat : failedSourceStats) {
sourceFailedAttempts += stat.getFailedAttempts();
sourceDataBytes += stat.getDataBytes();
sourceEffectiveDataBytes += stat.getEffectiveDataBytes();
sourceNumBlocks += stat.getNumBlocks();
}
for (const auto &senderThread : senderThreads_) {
const auto &stat = senderThread->getTransferStats();
threadFailedAttempts += stat.getFailedAttempts();
threadDataBytes += stat.getDataBytes();
threadEffectiveDataBytes += stat.getEffectiveDataBytes();
threadNumBlocks += stat.getNumBlocks();
}
WDT_CHECK(sourceFailedAttempts == threadFailedAttempts);
WDT_CHECK(sourceDataBytes == threadDataBytes);
WDT_CHECK(sourceEffectiveDataBytes == threadEffectiveDataBytes);
WDT_CHECK(sourceNumBlocks == threadNumBlocks);
}
void Sender::setSocketCreator(Sender::ISocketCreator *socketCreator) {
socketCreator_ = socketCreator;
}
void Sender::reportProgress() {
WDT_CHECK(progressReportIntervalMillis_ > 0);
int throughputUpdateIntervalMillis =
options_.throughput_update_interval_millis;
WDT_CHECK(throughputUpdateIntervalMillis >= 0);
int throughputUpdateInterval =
throughputUpdateIntervalMillis / progressReportIntervalMillis_;
int64_t lastEffectiveBytes = 0;
std::chrono::time_point<Clock> lastUpdateTime = Clock::now();
int intervalsSinceLastUpdate = 0;
double currentThroughput = 0;
auto waitingTime = std::chrono::milliseconds(progressReportIntervalMillis_);
WLOG(INFO) << "Progress reporter tracking every "
<< progressReportIntervalMillis_ << " ms";
while (true) {
{
std::unique_lock<std::mutex> lock(mutex_);
conditionFinished_.wait_for(lock, waitingTime);
if (transferStatus_ == THREADS_JOINED) {
break;
}
}
std::unique_ptr<TransferReport> transferReport = getTransferReport();
intervalsSinceLastUpdate++;
if (intervalsSinceLastUpdate >= throughputUpdateInterval) {
auto curTime = Clock::now();
int64_t curEffectiveBytes =
transferReport->getSummary().getEffectiveDataBytes();
double time = durationSeconds(curTime - lastUpdateTime);
currentThroughput = (curEffectiveBytes - lastEffectiveBytes) / time;
lastEffectiveBytes = curEffectiveBytes;
lastUpdateTime = curTime;
intervalsSinceLastUpdate = 0;
}
transferReport->setCurrentThroughput(currentThroughput);
progressReporter_->progress(transferReport);
if (reportPerfSignal_.notified()) {
logPerfStats();
}
}
}
void Sender::logPerfStats() const {
if (!options_.enable_perf_stat_collection) {
return;
}
PerfStatReport report(options_);
for (auto &senderThread : senderThreads_) {
report += senderThread->getPerfReport();
}
report += dirQueue_->getPerfReport();
WLOG(INFO) << report;
}
}
} // namespace facebook::wdt