in src/cluster/replication.cc [758:844]
Status ReplicationThread::parallelFetchFile(const std::string &dir,
const std::vector<std::pair<std::string, uint32_t>> &files) {
size_t concurrency = 1;
if (files.size() > 20) {
// Use 4 threads to download files in parallel
concurrency = 4;
}
std::atomic<uint32_t> fetch_cnt = {0};
std::atomic<uint32_t> skip_cnt = {0};
std::vector<std::future<Status>> results;
for (size_t tid = 0; tid < concurrency; ++tid) {
results.push_back(
std::async(std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status {
if (this->stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
ssl_st *ssl = nullptr;
#ifdef ENABLE_OPENSSL
if (this->srv_->GetConfig()->tls_replication) {
ssl = SSL_new(this->srv_->ssl_ctx.get());
}
auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
#endif
int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl,
this->srv_->GetConfig()->replication_connect_timeout_ms,
this->srv_->GetConfig()->replication_recv_timeout_ms)
.Prefixed("connect the server err"));
#ifdef ENABLE_OPENSSL
exit.Disable();
#endif
UniqueFD unique_fd{sock_fd};
auto s = this->sendAuth(sock_fd, ssl);
if (!s.IsOK()) {
return s.Prefixed("send the auth command err");
}
std::vector<std::string> fetch_files;
std::vector<uint32_t> crcs;
for (auto f_idx = tid; f_idx < files.size(); f_idx += concurrency) {
if (this->stop_flag_) {
return {Status::NotOK, "replication thread was stopped"};
}
const auto &f_name = files[f_idx].first;
const auto &f_crc = files[f_idx].second;
// Don't fetch existing files
if (engine::Storage::ReplDataManager::FileExists(this->storage_, dir, f_name, f_crc)) {
skip_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
info("[skip] {} {}, skip count: {}, fetch count: {}, progress: {} / {}", f_name, f_crc, cur_skip_cnt,
cur_fetch_cnt, (cur_skip_cnt + cur_fetch_cnt), files.size());
continue;
}
fetch_files.push_back(f_name);
crcs.push_back(f_crc);
}
unsigned files_count = files.size();
FetchFileCallback fn = [&fetch_cnt, &skip_cnt, files_count](const std::string &fetch_file,
uint32_t fetch_crc) {
fetch_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
info("[fetch] Fetched {}, crc32 {}, skip count: {}, fetch count: {}, progress: {} / {}", fetch_file,
fetch_crc, cur_skip_cnt, cur_fetch_cnt, cur_skip_cnt + cur_fetch_cnt, files_count);
};
// For master using old version, it only supports to fetch a single file by one
// command, so we need to fetch all files by multiple command interactions.
if (srv_->GetConfig()->master_use_repl_port) {
for (unsigned i = 0; i < fetch_files.size(); i++) {
s = this->fetchFiles(sock_fd, dir, {fetch_files[i]}, {crcs[i]}, fn, ssl);
if (!s.IsOK()) break;
}
} else {
if (!fetch_files.empty()) {
s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn, ssl);
}
}
return s;
}));
}
// Wait til finish
for (auto &f : results) {
Status s = f.get();
if (!s.IsOK()) return s;
}
return Status::OK();
}