in benchmarks/storage_bench/StorageBench.h [142:308]
bool connect() {
XLOGF(INFO, "Start to connect...");
if (!setupIBSock()) {
return false;
}
mgmtdClientConfig_.set_mgmtd_server_addresses(benchOptions_.mgmtdEndpoints);
mgmtdClientConfig_.set_enable_auto_refresh(true);
mgmtdClientConfig_.set_enable_auto_heartbeat(false);
mgmtdClientConfig_.set_enable_auto_extend_client_session(true);
mgmtdClientConfig_.set_auto_refresh_interval(3_s);
mgmtdClientConfig_.set_auto_heartbeat_interval(3_s);
mgmtdClientConfig_.set_auto_extend_client_session_interval(3_s);
mgmtdClientConfig_.set_accept_incomplete_routing_info_during_mgmtd_bootstrapping(false);
if (!client_.start()) {
XLOGF(ERR, "Failed to start net client for mgmtd client");
return false;
}
XLOGF(INFO, "Creating mgmtd client...");
auto stubFactory = std::make_unique<hf3fs::stubs::RealStubFactory<hf3fs::mgmtd::MgmtdServiceStub>>(
stubs::ClientContextCreator{[this](net::Address addr) { return client_.serdeCtx(addr); }});
auto mgmtdClient = std::make_unique<hf3fs::client::MgmtdClientForClient>(benchOptions_.clusterId,
std::move(stubFactory),
mgmtdClientConfig_);
auto physicalHostnameRes = SysResource::hostname(/*physicalMachineName=*/true);
if (!physicalHostnameRes) {
XLOGF(ERR, "getHostname(true) failed: {}", physicalHostnameRes.error());
return false;
}
auto containerHostnameRes = SysResource::hostname(/*physicalMachineName=*/false);
if (!containerHostnameRes) {
XLOGF(ERR, "getHostname(false) failed: {}", containerHostnameRes.error());
return false;
}
mgmtdClient->setClientSessionPayload({clientId_.uuid.toHexString(),
flat::NodeType::CLIENT,
flat::ClientSessionData::create(
/*universalId=*/*physicalHostnameRes,
/*description=*/fmt::format("StorageBench: {}", *containerHostnameRes),
/*serviceGroups=*/std::vector<flat::ServiceGroupInfo>{},
flat::ReleaseVersion::fromVersionInfo()),
flat::UserInfo{}});
folly::coro::blockingWait(mgmtdClient->start(&client_.tpg().bgThreadPool().randomPick()));
mgmtdForClient_ = std::move(mgmtdClient);
// get routing info
for (size_t retry = 0; retry < 15; retry++) {
auto routingInfo = mgmtdForClient_->getRoutingInfo();
if (routingInfo == nullptr || routingInfo->raw()->chains.empty()) {
XLOGF(WARN, "Empty routing info, #{} retry...", retry + 1);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} else {
for (const auto &[tableId, tableVersions] : routingInfo->raw()->chainTables) {
if (tableId == benchOptions_.chainTableId) {
if (tableVersions.empty()) {
XLOGF(WARN, "No version found for chain table with id {}", tableId);
return false;
}
XLOGF(INFO, "Found {} version(s) of chain table {}", tableVersions.size(), benchOptions_.chainTableId);
flat::ChainTable chainTable;
if (benchOptions_.chainTableVersion > 0) {
flat::ChainTableVersion tableVersion(benchOptions_.chainTableVersion);
auto tableIter = tableVersions.find(tableVersion);
if (tableIter == tableVersions.end()) {
XLOGF(WARN, "Version {} not found in chain table with id {}", tableVersion, tableId);
return false;
}
chainTable = tableIter->second;
XLOGF(INFO,
"Found version {} of chain table {}: {}",
benchOptions_.chainTableVersion,
benchOptions_.chainTableId,
chainTable.chainTableVersion);
} else {
const auto iter = --tableVersions.cend();
const auto &latestTable = iter->second;
chainTable = latestTable;
XLOGF(INFO,
"Found latest version of chain table {}: {}",
benchOptions_.chainTableId,
chainTable.chainTableVersion);
}
XLOGF(WARN,
"Selected chain table: {}@{} [{}] {} chains",
chainTable.chainTableId,
chainTable.chainTableVersion,
chainTable.desc,
chainTable.chains.size());
if (!benchOptions_.storageNodeIds.empty()) {
for (const auto &chainId : chainTable.chains) {
const auto chainInfo = routingInfo->raw()->getChain(chainId);
for (const auto &target : chainInfo->targets) {
const auto targetInfo = routingInfo->raw()->getTarget(target.targetId);
auto nodeIter = std::find(benchOptions_.storageNodeIds.begin(),
benchOptions_.storageNodeIds.end(),
*targetInfo->nodeId);
if (nodeIter != benchOptions_.storageNodeIds.end()) {
chainIds_.push_back(chainId);
break;
}
}
}
} else if (!benchOptions_.chainIds.empty()) {
for (const auto &chainId : chainTable.chains) {
auto chainIter = std::find(benchOptions_.chainIds.begin(), benchOptions_.chainIds.end(), chainId);
if (chainIter != benchOptions_.chainIds.end()) {
chainIds_.push_back(chainId);
}
}
} else {
chainIds_ = chainTable.chains;
}
break;
}
}
if (!chainIds_.empty()) break;
}
}
if (chainIds_.empty()) {
XLOGF(ERR, "Failed to get chain table with id {}", benchOptions_.chainTableId);
return false;
} else {
XLOGF(WARN, "Selected {} replication chains for benchmark", chainIds_.size());
}
// create storage client
if (setupConfig_.client_config().empty()) {
XLOGF(ERR, "Storage client config not specified");
return false;
}
auto configRes = clientConfig_.atomicallyUpdate(setupConfig_.client_config(), false /*isHotUpdate*/);
if (!configRes) {
XLOGF(ERR, "Cannot load client config from {}, error: {}", setupConfig_.client_config(), configRes.error());
return false;
}
totalNumChunks_ = chainIds_.size() * benchOptions_.numCoroutines * benchOptions_.numChunks;
totalChunkGiB_ = (double)totalNumChunks_ * setupConfig_.chunk_size() / 1_GB;
clientConfig_.retry().set_max_retry_time(Duration(std::chrono::milliseconds(benchOptions_.clientTimeoutMS)));
clientConfig_.net_client().io_worker().ibsocket().set_sl(setupConfig_.service_level());
XLOGF(INFO, "Creating storage client...");
storageClient_ = client::StorageClient::create(clientId_, clientConfig_, *mgmtdForClient_);
return true;
}