Status ReplicationThread::parallelFetchFile()

in src/cluster/replication.cc [758:844]


Status ReplicationThread::parallelFetchFile(const std::string &dir,
                                            const std::vector<std::pair<std::string, uint32_t>> &files) {
  size_t concurrency = 1;
  if (files.size() > 20) {
    // Use 4 threads to download files in parallel
    concurrency = 4;
  }
  std::atomic<uint32_t> fetch_cnt = {0};
  std::atomic<uint32_t> skip_cnt = {0};
  std::vector<std::future<Status>> results;
  for (size_t tid = 0; tid < concurrency; ++tid) {
    results.push_back(
        std::async(std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status {
          if (this->stop_flag_) {
            return {Status::NotOK, "replication thread was stopped"};
          }
          ssl_st *ssl = nullptr;
#ifdef ENABLE_OPENSSL
          if (this->srv_->GetConfig()->tls_replication) {
            ssl = SSL_new(this->srv_->ssl_ctx.get());
          }
          auto exit = MakeScopeExit([ssl] { SSL_free(ssl); });
#endif
          int sock_fd = GET_OR_RET(util::SockConnect(this->host_, this->port_, ssl,
                                                     this->srv_->GetConfig()->replication_connect_timeout_ms,
                                                     this->srv_->GetConfig()->replication_recv_timeout_ms)
                                       .Prefixed("connect the server err"));
#ifdef ENABLE_OPENSSL
          exit.Disable();
#endif
          UniqueFD unique_fd{sock_fd};
          auto s = this->sendAuth(sock_fd, ssl);
          if (!s.IsOK()) {
            return s.Prefixed("send the auth command err");
          }
          std::vector<std::string> fetch_files;
          std::vector<uint32_t> crcs;
          for (auto f_idx = tid; f_idx < files.size(); f_idx += concurrency) {
            if (this->stop_flag_) {
              return {Status::NotOK, "replication thread was stopped"};
            }
            const auto &f_name = files[f_idx].first;
            const auto &f_crc = files[f_idx].second;
            // Don't fetch existing files
            if (engine::Storage::ReplDataManager::FileExists(this->storage_, dir, f_name, f_crc)) {
              skip_cnt.fetch_add(1);
              uint32_t cur_skip_cnt = skip_cnt.load();
              uint32_t cur_fetch_cnt = fetch_cnt.load();
              info("[skip] {} {}, skip count: {}, fetch count: {}, progress: {} / {}", f_name, f_crc, cur_skip_cnt,
                   cur_fetch_cnt, (cur_skip_cnt + cur_fetch_cnt), files.size());
              continue;
            }
            fetch_files.push_back(f_name);
            crcs.push_back(f_crc);
          }
          unsigned files_count = files.size();
          FetchFileCallback fn = [&fetch_cnt, &skip_cnt, files_count](const std::string &fetch_file,
                                                                      uint32_t fetch_crc) {
            fetch_cnt.fetch_add(1);
            uint32_t cur_skip_cnt = skip_cnt.load();
            uint32_t cur_fetch_cnt = fetch_cnt.load();
            info("[fetch] Fetched {}, crc32 {}, skip count: {}, fetch count: {}, progress: {} / {}", fetch_file,
                 fetch_crc, cur_skip_cnt, cur_fetch_cnt, cur_skip_cnt + cur_fetch_cnt, files_count);
          };
          // For master using old version, it only supports to fetch a single file by one
          // command, so we need to fetch all files by multiple command interactions.
          if (srv_->GetConfig()->master_use_repl_port) {
            for (unsigned i = 0; i < fetch_files.size(); i++) {
              s = this->fetchFiles(sock_fd, dir, {fetch_files[i]}, {crcs[i]}, fn, ssl);
              if (!s.IsOK()) break;
            }
          } else {
            if (!fetch_files.empty()) {
              s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn, ssl);
            }
          }
          return s;
        }));
  }

  // Wait til finish
  for (auto &f : results) {
    Status s = f.get();
    if (!s.IsOK()) return s;
  }
  return Status::OK();
}