std::unique_ptr Sender::finish()

in Sender.cpp [171:261]


std::unique_ptr<TransferReport> Sender::finish() {
  std::unique_lock<std::mutex> instanceLock(instanceManagementMutex_);
  WVLOG(1) << "Sender::finish()";
  TransferStatus status = getTransferStatus();
  if (status == NOT_STARTED) {
    WLOG(WARNING) << "Even though transfer has not started, finish is called";
    // getTransferReport will set the error code to ERROR
    return getTransferReport();
  }
  if (status == THREADS_JOINED) {
    WVLOG(1) << "Threads have already been joined. Returning the"
             << " existing transfer report";
    return getTransferReport();
  }
  const bool twoPhases = options_.two_phases;
  bool progressReportEnabled =
      progressReporter_ && progressReportIntervalMillis_ > 0;
  for (auto &senderThread : senderThreads_) {
    senderThread->finish();
  }
  if (!twoPhases) {
    dirThread_.join();
  }
  WDT_CHECK(numActiveThreads_ == 0);
  setTransferStatus(THREADS_JOINED);
  if (progressReportEnabled) {
    progressReporterThread_.join();
  }
  std::vector<TransferStats> threadStats;
  for (auto &senderThread : senderThreads_) {
    threadStats.push_back(senderThread->moveStats());
  }

  bool allSourcesAcked = false;
  for (auto &senderThread : senderThreads_) {
    auto &stats = senderThread->getTransferStats();
    if (stats.getErrorCode() == OK) {
      // at least one thread finished correctly
      // that means all transferred sources are acked
      allSourcesAcked = true;
      break;
    }
  }

  std::vector<TransferStats> transferredSourceStats;
  for (auto port : transferRequest_.ports) {
    auto &transferHistory =
        transferHistoryController_->getTransferHistory(port);
    if (allSourcesAcked) {
      transferHistory.markAllAcknowledged();
    } else {
      transferHistory.returnUnackedSourcesToQueue();
    }
    if (options_.full_reporting) {
      std::vector<TransferStats> stats = transferHistory.popAckedSourceStats();
      transferredSourceStats.insert(transferredSourceStats.end(),
                                    std::make_move_iterator(stats.begin()),
                                    std::make_move_iterator(stats.end()));
    }
  }
  if (options_.full_reporting) {
    validateTransferStats(transferredSourceStats,
                          dirQueue_->getFailedSourceStats());
  }
  int64_t totalFileSize = dirQueue_->getTotalSize();
  double totalTime = durationSeconds(endTime_ - startTime_);
  std::unique_ptr<TransferReport> transferReport =
      std::make_unique<TransferReport>(
          transferredSourceStats, dirQueue_->getFailedSourceStats(),
          threadStats, dirQueue_->getFailedDirectories(), totalTime,
          totalFileSize, dirQueue_->getCount(),
          dirQueue_->getPreviouslySentBytes(),
          dirQueue_->fileDiscoveryFinished());

  if (progressReportEnabled) {
    progressReporter_->end(transferReport);
  }
  logPerfStats();

  double directoryTime;
  directoryTime = dirQueue_->getDirectoryTime();
  WLOG(INFO) << "Total sender time = " << totalTime << " seconds ("
             << directoryTime << " dirTime)"
             << ". Transfer summary : " << *transferReport << "\n"
             << WDT_LOG_PREFIX << "Total sender throughput = "
             << transferReport->getThroughputMBps() << " Mbytes/sec ("
             << transferReport->getSummary().getEffectiveTotalBytes() /
                    (totalTime - directoryTime) / kMbToB
             << " Mbytes/sec pure transfer rate)";
  return transferReport;
}