in Sender.cpp [420:464]
void Sender::reportProgress() {
WDT_CHECK(progressReportIntervalMillis_ > 0);
int throughputUpdateIntervalMillis =
options_.throughput_update_interval_millis;
WDT_CHECK(throughputUpdateIntervalMillis >= 0);
int throughputUpdateInterval =
throughputUpdateIntervalMillis / progressReportIntervalMillis_;
int64_t lastEffectiveBytes = 0;
std::chrono::time_point<Clock> lastUpdateTime = Clock::now();
int intervalsSinceLastUpdate = 0;
double currentThroughput = 0;
auto waitingTime = std::chrono::milliseconds(progressReportIntervalMillis_);
WLOG(INFO) << "Progress reporter tracking every "
<< progressReportIntervalMillis_ << " ms";
while (true) {
{
std::unique_lock<std::mutex> lock(mutex_);
conditionFinished_.wait_for(lock, waitingTime);
if (transferStatus_ == THREADS_JOINED) {
break;
}
}
std::unique_ptr<TransferReport> transferReport = getTransferReport();
intervalsSinceLastUpdate++;
if (intervalsSinceLastUpdate >= throughputUpdateInterval) {
auto curTime = Clock::now();
int64_t curEffectiveBytes =
transferReport->getSummary().getEffectiveDataBytes();
double time = durationSeconds(curTime - lastUpdateTime);
currentThroughput = (curEffectiveBytes - lastEffectiveBytes) / time;
lastEffectiveBytes = curEffectiveBytes;
lastUpdateTime = curTime;
intervalsSinceLastUpdate = 0;
}
transferReport->setCurrentThroughput(currentThroughput);
progressReporter_->progress(transferReport);
if (reportPerfSignal_.notified()) {
logPerfStats();
}
}
}