in Sender.cpp [171:261]
std::unique_ptr<TransferReport> Sender::finish() {
std::unique_lock<std::mutex> instanceLock(instanceManagementMutex_);
WVLOG(1) << "Sender::finish()";
TransferStatus status = getTransferStatus();
if (status == NOT_STARTED) {
WLOG(WARNING) << "Even though transfer has not started, finish is called";
// getTransferReport will set the error code to ERROR
return getTransferReport();
}
if (status == THREADS_JOINED) {
WVLOG(1) << "Threads have already been joined. Returning the"
<< " existing transfer report";
return getTransferReport();
}
const bool twoPhases = options_.two_phases;
bool progressReportEnabled =
progressReporter_ && progressReportIntervalMillis_ > 0;
for (auto &senderThread : senderThreads_) {
senderThread->finish();
}
if (!twoPhases) {
dirThread_.join();
}
WDT_CHECK(numActiveThreads_ == 0);
setTransferStatus(THREADS_JOINED);
if (progressReportEnabled) {
progressReporterThread_.join();
}
std::vector<TransferStats> threadStats;
for (auto &senderThread : senderThreads_) {
threadStats.push_back(senderThread->moveStats());
}
bool allSourcesAcked = false;
for (auto &senderThread : senderThreads_) {
auto &stats = senderThread->getTransferStats();
if (stats.getErrorCode() == OK) {
// at least one thread finished correctly
// that means all transferred sources are acked
allSourcesAcked = true;
break;
}
}
std::vector<TransferStats> transferredSourceStats;
for (auto port : transferRequest_.ports) {
auto &transferHistory =
transferHistoryController_->getTransferHistory(port);
if (allSourcesAcked) {
transferHistory.markAllAcknowledged();
} else {
transferHistory.returnUnackedSourcesToQueue();
}
if (options_.full_reporting) {
std::vector<TransferStats> stats = transferHistory.popAckedSourceStats();
transferredSourceStats.insert(transferredSourceStats.end(),
std::make_move_iterator(stats.begin()),
std::make_move_iterator(stats.end()));
}
}
if (options_.full_reporting) {
validateTransferStats(transferredSourceStats,
dirQueue_->getFailedSourceStats());
}
int64_t totalFileSize = dirQueue_->getTotalSize();
double totalTime = durationSeconds(endTime_ - startTime_);
std::unique_ptr<TransferReport> transferReport =
std::make_unique<TransferReport>(
transferredSourceStats, dirQueue_->getFailedSourceStats(),
threadStats, dirQueue_->getFailedDirectories(), totalTime,
totalFileSize, dirQueue_->getCount(),
dirQueue_->getPreviouslySentBytes(),
dirQueue_->fileDiscoveryFinished());
if (progressReportEnabled) {
progressReporter_->end(transferReport);
}
logPerfStats();
double directoryTime;
directoryTime = dirQueue_->getDirectoryTime();
WLOG(INFO) << "Total sender time = " << totalTime << " seconds ("
<< directoryTime << " dirTime)"
<< ". Transfer summary : " << *transferReport << "\n"
<< WDT_LOG_PREFIX << "Total sender throughput = "
<< transferReport->getThroughputMBps() << " Mbytes/sec ("
<< transferReport->getSummary().getEffectiveTotalBytes() /
(totalTime - directoryTime) / kMbToB
<< " Mbytes/sec pure transfer rate)";
return transferReport;
}