ErrorCode Sender::start()

in Sender.cpp [290:375]


ErrorCode Sender::start() {
  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (transferStatus_ != NOT_STARTED) {
      WLOG(ERROR) << "duplicate start() call detected " << transferStatus_;
      return ALREADY_EXISTS;
    }
    transferStatus_ = ONGOING;
  }

  // set up directory queue
  dirQueue_.reset(new DirectorySourceQueue(options_, transferRequest_.directory,
                                           &queueAbortChecker_));
  WVLOG(3) << "Configuring the  directory queue";
  dirQueue_->setIncludePattern(options_.include_regex);
  dirQueue_->setExcludePattern(options_.exclude_regex);
  dirQueue_->setPruneDirPattern(options_.prune_dir_regex);
  dirQueue_->setFollowSymlinks(options_.follow_symlinks);
  dirQueue_->setBlockSizeMbytes(options_.block_size_mbytes);
  dirQueue_->setNumClientThreads(transferRequest_.ports.size());
  dirQueue_->setOpenFilesDuringDiscovery(options_.open_files_during_discovery);
  dirQueue_->setDirectReads(options_.odirect_reads);
  if (!transferRequest_.fileInfo.empty() ||
      transferRequest_.fileInfoGenerator ||
      transferRequest_.disableDirectoryTraversal) {
    dirQueue_->setFileInfo(transferRequest_.fileInfo);
    if (transferRequest_.fileInfoGenerator) {
      dirQueue_->setFileInfoGenerator(
          [this]() { return getFilesFromFileInfoGenerator(); });
    }
  }
  transferHistoryController_ =
      std::make_unique<TransferHistoryController>(*dirQueue_);

  checkAndUpdateBufferSize();
  const bool twoPhases = options_.two_phases;
  WLOG(INFO) << "Client (sending) to " << getDestination() << ", Using ports [ "
             << transferRequest_.ports << "]";
  startTime_ = Clock::now();
  progressReportIntervalMillis_ = options_.progress_report_interval_millis;
  downloadResumptionEnabled_ = (transferRequest_.downloadResumptionEnabled ||
                                options_.enable_download_resumption);
  bool deleteExtraFiles = (transferRequest_.downloadResumptionEnabled ||
                           options_.delete_extra_files);
  if (!progressReporter_) {
    WVLOG(1) << "No progress reporter provided, making a default one";
    progressReporter_ = std::make_unique<ProgressReporter>(transferRequest_);
  }
  bool progressReportEnabled =
      progressReporter_ && progressReportIntervalMillis_ > 0;
  if (throttler_) {
    WLOG(INFO) << "Skipping throttler setup. External throttler set."
               << "Throttler details : " << *throttler_;
  } else {
    configureThrottler();
  }
  threadsController_ = new ThreadsController(transferRequest_.ports.size());
  threadsController_->setNumBarriers(SenderThread::NUM_BARRIERS);
  threadsController_->setNumFunnels(SenderThread::NUM_FUNNELS);
  threadsController_->setNumConditions(SenderThread::NUM_CONDITIONS);
  // TODO: fix this ! use transferRequest! (and dup from Receiver)
  senderThreads_ = threadsController_->makeThreads<Sender, SenderThread>(
      this, transferRequest_.ports.size(), transferRequest_.ports);
  if (downloadResumptionEnabled_ && deleteExtraFiles) {
    if (getProtocolVersion() >= Protocol::DELETE_CMD_VERSION) {
      dirQueue_->enableFileDeletion();
    } else {
      WLOG(WARNING) << "Turning off extra file deletion on the receiver side "
                       "because of protocol version "
                    << getProtocolVersion();
    }
  }
  dirThread_ = dirQueue_->buildQueueAsynchronously();
  if (twoPhases) {
    dirThread_.join();
  }
  for (auto &senderThread : senderThreads_) {
    senderThread->startThread();
  }
  if (progressReportEnabled) {
    progressReporter_->start();
    std::thread reporterThread(&Sender::reportProgress, this);
    progressReporterThread_ = std::move(reporterThread);
  }
  return OK;
}