bool connect()

in benchmarks/storage_bench/StorageBench.h [142:308]


  bool connect() {
    XLOGF(INFO, "Start to connect...");

    if (!setupIBSock()) {
      return false;
    }

    mgmtdClientConfig_.set_mgmtd_server_addresses(benchOptions_.mgmtdEndpoints);
    mgmtdClientConfig_.set_enable_auto_refresh(true);
    mgmtdClientConfig_.set_enable_auto_heartbeat(false);
    mgmtdClientConfig_.set_enable_auto_extend_client_session(true);
    mgmtdClientConfig_.set_auto_refresh_interval(3_s);
    mgmtdClientConfig_.set_auto_heartbeat_interval(3_s);
    mgmtdClientConfig_.set_auto_extend_client_session_interval(3_s);
    mgmtdClientConfig_.set_accept_incomplete_routing_info_during_mgmtd_bootstrapping(false);

    if (!client_.start()) {
      XLOGF(ERR, "Failed to start net client for mgmtd client");
      return false;
    }

    XLOGF(INFO, "Creating mgmtd client...");

    auto stubFactory = std::make_unique<hf3fs::stubs::RealStubFactory<hf3fs::mgmtd::MgmtdServiceStub>>(
        stubs::ClientContextCreator{[this](net::Address addr) { return client_.serdeCtx(addr); }});
    auto mgmtdClient = std::make_unique<hf3fs::client::MgmtdClientForClient>(benchOptions_.clusterId,
                                                                             std::move(stubFactory),
                                                                             mgmtdClientConfig_);

    auto physicalHostnameRes = SysResource::hostname(/*physicalMachineName=*/true);
    if (!physicalHostnameRes) {
      XLOGF(ERR, "getHostname(true) failed: {}", physicalHostnameRes.error());
      return false;
    }

    auto containerHostnameRes = SysResource::hostname(/*physicalMachineName=*/false);
    if (!containerHostnameRes) {
      XLOGF(ERR, "getHostname(false) failed: {}", containerHostnameRes.error());
      return false;
    }

    mgmtdClient->setClientSessionPayload({clientId_.uuid.toHexString(),
                                          flat::NodeType::CLIENT,
                                          flat::ClientSessionData::create(
                                              /*universalId=*/*physicalHostnameRes,
                                              /*description=*/fmt::format("StorageBench: {}", *containerHostnameRes),
                                              /*serviceGroups=*/std::vector<flat::ServiceGroupInfo>{},
                                              flat::ReleaseVersion::fromVersionInfo()),
                                          flat::UserInfo{}});
    folly::coro::blockingWait(mgmtdClient->start(&client_.tpg().bgThreadPool().randomPick()));
    mgmtdForClient_ = std::move(mgmtdClient);

    // get routing info

    for (size_t retry = 0; retry < 15; retry++) {
      auto routingInfo = mgmtdForClient_->getRoutingInfo();

      if (routingInfo == nullptr || routingInfo->raw()->chains.empty()) {
        XLOGF(WARN, "Empty routing info, #{} retry...", retry + 1);
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
      } else {
        for (const auto &[tableId, tableVersions] : routingInfo->raw()->chainTables) {
          if (tableId == benchOptions_.chainTableId) {
            if (tableVersions.empty()) {
              XLOGF(WARN, "No version found for chain table with id {}", tableId);
              return false;
            }

            XLOGF(INFO, "Found {} version(s) of chain table {}", tableVersions.size(), benchOptions_.chainTableId);

            flat::ChainTable chainTable;

            if (benchOptions_.chainTableVersion > 0) {
              flat::ChainTableVersion tableVersion(benchOptions_.chainTableVersion);
              auto tableIter = tableVersions.find(tableVersion);

              if (tableIter == tableVersions.end()) {
                XLOGF(WARN, "Version {} not found in chain table with id {}", tableVersion, tableId);
                return false;
              }

              chainTable = tableIter->second;
              XLOGF(INFO,
                    "Found version {} of chain table {}: {}",
                    benchOptions_.chainTableVersion,
                    benchOptions_.chainTableId,
                    chainTable.chainTableVersion);
            } else {
              const auto iter = --tableVersions.cend();
              const auto &latestTable = iter->second;
              chainTable = latestTable;
              XLOGF(INFO,
                    "Found latest version of chain table {}: {}",
                    benchOptions_.chainTableId,
                    chainTable.chainTableVersion);
            }

            XLOGF(WARN,
                  "Selected chain table: {}@{} [{}] {} chains",
                  chainTable.chainTableId,
                  chainTable.chainTableVersion,
                  chainTable.desc,
                  chainTable.chains.size());

            if (!benchOptions_.storageNodeIds.empty()) {
              for (const auto &chainId : chainTable.chains) {
                const auto chainInfo = routingInfo->raw()->getChain(chainId);
                for (const auto &target : chainInfo->targets) {
                  const auto targetInfo = routingInfo->raw()->getTarget(target.targetId);
                  auto nodeIter = std::find(benchOptions_.storageNodeIds.begin(),
                                            benchOptions_.storageNodeIds.end(),
                                            *targetInfo->nodeId);
                  if (nodeIter != benchOptions_.storageNodeIds.end()) {
                    chainIds_.push_back(chainId);
                    break;
                  }
                }
              }
            } else if (!benchOptions_.chainIds.empty()) {
              for (const auto &chainId : chainTable.chains) {
                auto chainIter = std::find(benchOptions_.chainIds.begin(), benchOptions_.chainIds.end(), chainId);
                if (chainIter != benchOptions_.chainIds.end()) {
                  chainIds_.push_back(chainId);
                }
              }
            } else {
              chainIds_ = chainTable.chains;
            }

            break;
          }
        }

        if (!chainIds_.empty()) break;
      }
    }

    if (chainIds_.empty()) {
      XLOGF(ERR, "Failed to get chain table with id {}", benchOptions_.chainTableId);
      return false;
    } else {
      XLOGF(WARN, "Selected {} replication chains for benchmark", chainIds_.size());
    }

    // create storage client

    if (setupConfig_.client_config().empty()) {
      XLOGF(ERR, "Storage client config not specified");
      return false;
    }

    auto configRes = clientConfig_.atomicallyUpdate(setupConfig_.client_config(), false /*isHotUpdate*/);
    if (!configRes) {
      XLOGF(ERR, "Cannot load client config from {}, error: {}", setupConfig_.client_config(), configRes.error());
      return false;
    }

    totalNumChunks_ = chainIds_.size() * benchOptions_.numCoroutines * benchOptions_.numChunks;
    totalChunkGiB_ = (double)totalNumChunks_ * setupConfig_.chunk_size() / 1_GB;
    clientConfig_.retry().set_max_retry_time(Duration(std::chrono::milliseconds(benchOptions_.clientTimeoutMS)));
    clientConfig_.net_client().io_worker().ibsocket().set_sl(setupConfig_.service_level());

    XLOGF(INFO, "Creating storage client...");
    storageClient_ = client::StorageClient::create(clientId_, clientConfig_, *mgmtdForClient_);

    return true;
  }