void Receiver::progressTracker()

in Receiver.cpp [381:439]


void Receiver::progressTracker() {
  // Progress tracker will check for progress after the time specified
  // in milliseconds.
  int progressReportIntervalMillis = options_.progress_report_interval_millis;
  int throughputUpdateIntervalMillis =
      options_.throughput_update_interval_millis;
  if (progressReportIntervalMillis <= 0 || throughputUpdateIntervalMillis < 0 ||
      !isJoinable_) {
    return;
  }
  int throughputUpdateInterval =
      throughputUpdateIntervalMillis / progressReportIntervalMillis;

  int64_t lastEffectiveBytes = 0;
  std::chrono::time_point<Clock> lastUpdateTime = Clock::now();
  int intervalsSinceLastUpdate = 0;
  double currentThroughput = 0;
  WLOG(INFO) << "Progress reporter updating every "
             << progressReportIntervalMillis << " ms";
  auto waitingTime = std::chrono::milliseconds(progressReportIntervalMillis);
  int64_t totalSenderBytes = -1;
  while (true) {
    {
      std::unique_lock<std::mutex> lock(mutex_);
      conditionFinished_.wait_for(lock, waitingTime);
      if (transferStatus_ == THREADS_JOINED) {
        break;
      }
    }
    double totalTime = durationSeconds(Clock::now() - startTime_.load());
    TransferStats globalStats;
    for (const auto &receiverThread : receiverThreads_) {
      globalStats += receiverThread->getTransferStats();
    }
    totalSenderBytes = globalStats.getTotalSenderBytes();
    // Note: totalSenderBytes may not be valid yet if sender has not
    // completed file discovery.  But that's ok, report whatever progress
    // we can.
    auto transferReport = std::make_unique<TransferReport>(
        std::move(globalStats), totalTime, totalSenderBytes, 0, true);
    intervalsSinceLastUpdate++;
    if (intervalsSinceLastUpdate >= throughputUpdateInterval) {
      auto curTime = Clock::now();
      int64_t curEffectiveBytes =
          transferReport->getSummary().getEffectiveDataBytes();
      double time = durationSeconds(curTime - lastUpdateTime);
      currentThroughput = (curEffectiveBytes - lastEffectiveBytes) / time;
      lastEffectiveBytes = curEffectiveBytes;
      lastUpdateTime = curTime;
      intervalsSinceLastUpdate = 0;
    }
    transferReport->setCurrentThroughput(currentThroughput);

    progressReporter_->progress(transferReport);
    if (reportPerfSignal_.notified()) {
      logPerfStats();
    }
  }
}