in Sender.cpp [290:375]
ErrorCode Sender::start() {
{
std::lock_guard<std::mutex> lock(mutex_);
if (transferStatus_ != NOT_STARTED) {
WLOG(ERROR) << "duplicate start() call detected " << transferStatus_;
return ALREADY_EXISTS;
}
transferStatus_ = ONGOING;
}
// set up directory queue
dirQueue_.reset(new DirectorySourceQueue(options_, transferRequest_.directory,
&queueAbortChecker_));
WVLOG(3) << "Configuring the directory queue";
dirQueue_->setIncludePattern(options_.include_regex);
dirQueue_->setExcludePattern(options_.exclude_regex);
dirQueue_->setPruneDirPattern(options_.prune_dir_regex);
dirQueue_->setFollowSymlinks(options_.follow_symlinks);
dirQueue_->setBlockSizeMbytes(options_.block_size_mbytes);
dirQueue_->setNumClientThreads(transferRequest_.ports.size());
dirQueue_->setOpenFilesDuringDiscovery(options_.open_files_during_discovery);
dirQueue_->setDirectReads(options_.odirect_reads);
if (!transferRequest_.fileInfo.empty() ||
transferRequest_.fileInfoGenerator ||
transferRequest_.disableDirectoryTraversal) {
dirQueue_->setFileInfo(transferRequest_.fileInfo);
if (transferRequest_.fileInfoGenerator) {
dirQueue_->setFileInfoGenerator(
[this]() { return getFilesFromFileInfoGenerator(); });
}
}
transferHistoryController_ =
std::make_unique<TransferHistoryController>(*dirQueue_);
checkAndUpdateBufferSize();
const bool twoPhases = options_.two_phases;
WLOG(INFO) << "Client (sending) to " << getDestination() << ", Using ports [ "
<< transferRequest_.ports << "]";
startTime_ = Clock::now();
progressReportIntervalMillis_ = options_.progress_report_interval_millis;
downloadResumptionEnabled_ = (transferRequest_.downloadResumptionEnabled ||
options_.enable_download_resumption);
bool deleteExtraFiles = (transferRequest_.downloadResumptionEnabled ||
options_.delete_extra_files);
if (!progressReporter_) {
WVLOG(1) << "No progress reporter provided, making a default one";
progressReporter_ = std::make_unique<ProgressReporter>(transferRequest_);
}
bool progressReportEnabled =
progressReporter_ && progressReportIntervalMillis_ > 0;
if (throttler_) {
WLOG(INFO) << "Skipping throttler setup. External throttler set."
<< "Throttler details : " << *throttler_;
} else {
configureThrottler();
}
threadsController_ = new ThreadsController(transferRequest_.ports.size());
threadsController_->setNumBarriers(SenderThread::NUM_BARRIERS);
threadsController_->setNumFunnels(SenderThread::NUM_FUNNELS);
threadsController_->setNumConditions(SenderThread::NUM_CONDITIONS);
// TODO: fix this ! use transferRequest! (and dup from Receiver)
senderThreads_ = threadsController_->makeThreads<Sender, SenderThread>(
this, transferRequest_.ports.size(), transferRequest_.ports);
if (downloadResumptionEnabled_ && deleteExtraFiles) {
if (getProtocolVersion() >= Protocol::DELETE_CMD_VERSION) {
dirQueue_->enableFileDeletion();
} else {
WLOG(WARNING) << "Turning off extra file deletion on the receiver side "
"because of protocol version "
<< getProtocolVersion();
}
}
dirThread_ = dirQueue_->buildQueueAsynchronously();
if (twoPhases) {
dirThread_.join();
}
for (auto &senderThread : senderThreads_) {
senderThread->startThread();
}
if (progressReportEnabled) {
progressReporter_->start();
std::thread reporterThread(&Sender::reportProgress, this);
progressReporterThread_ = std::move(reporterThread);
}
return OK;
}