in Receiver.cpp [381:439]
void Receiver::progressTracker() {
// Progress tracker will check for progress after the time specified
// in milliseconds.
int progressReportIntervalMillis = options_.progress_report_interval_millis;
int throughputUpdateIntervalMillis =
options_.throughput_update_interval_millis;
if (progressReportIntervalMillis <= 0 || throughputUpdateIntervalMillis < 0 ||
!isJoinable_) {
return;
}
int throughputUpdateInterval =
throughputUpdateIntervalMillis / progressReportIntervalMillis;
int64_t lastEffectiveBytes = 0;
std::chrono::time_point<Clock> lastUpdateTime = Clock::now();
int intervalsSinceLastUpdate = 0;
double currentThroughput = 0;
WLOG(INFO) << "Progress reporter updating every "
<< progressReportIntervalMillis << " ms";
auto waitingTime = std::chrono::milliseconds(progressReportIntervalMillis);
int64_t totalSenderBytes = -1;
while (true) {
{
std::unique_lock<std::mutex> lock(mutex_);
conditionFinished_.wait_for(lock, waitingTime);
if (transferStatus_ == THREADS_JOINED) {
break;
}
}
double totalTime = durationSeconds(Clock::now() - startTime_.load());
TransferStats globalStats;
for (const auto &receiverThread : receiverThreads_) {
globalStats += receiverThread->getTransferStats();
}
totalSenderBytes = globalStats.getTotalSenderBytes();
// Note: totalSenderBytes may not be valid yet if sender has not
// completed file discovery. But that's ok, report whatever progress
// we can.
auto transferReport = std::make_unique<TransferReport>(
std::move(globalStats), totalTime, totalSenderBytes, 0, true);
intervalsSinceLastUpdate++;
if (intervalsSinceLastUpdate >= throughputUpdateInterval) {
auto curTime = Clock::now();
int64_t curEffectiveBytes =
transferReport->getSummary().getEffectiveDataBytes();
double time = durationSeconds(curTime - lastUpdateTime);
currentThroughput = (curEffectiveBytes - lastEffectiveBytes) / time;
lastEffectiveBytes = curEffectiveBytes;
lastUpdateTime = curTime;
intervalsSinceLastUpdate = 0;
}
transferReport->setCurrentThroughput(currentThroughput);
progressReporter_->progress(transferReport);
if (reportPerfSignal_.notified()) {
logPerfStats();
}
}
}