in src/common/net/ib/IBConnect.cc [338:451]
CoTryTask<void> IBSocket::connect(serde::ClientContext &ctx, Duration timeout) {
auto begin = SteadyClock::now();
connecting.addSample(1);
SCOPE_EXIT {
connecting.addSample(-1);
connectLatency.addSample(SteadyClock::now() - begin);
};
auto failedGuard = folly::makeGuard([]() { connectFailed.addSample(1); });
if (!IBManager::initialized()) {
XLOGF(CRITICAL, "IBDevice not initialized!");
co_return makeError(RPCCode::kIBDeviceNotInitialized);
}
net::UserRequestOptions opts;
opts.timeout = timeout;
auto addr = ctx.addr();
{
auto des = describe_.wlock();
*des = fmt::format("[connect RDMA://{}]", addr.toFollyAddress().describe());
}
if (addr.isRDMA()) {
XLOGF(FATAL, "IBSocket::connect should call with a TCP net client");
}
IBQueryReq query;
auto queryRsp = co_await IBConnect<>::query(ctx, query, &opts);
if (queryRsp.hasError()) {
auto now = RelativeTime::now();
if (now - lastLogErrorTime[addr] >= 10_s) {
XLOGF(ERR, "IBSocket {} failed to query, error {}, timeout {}", describe(), queryRsp.error(), timeout);
lastLogErrorTime.insert_or_assign(addr, now);
}
co_return makeError(std::move(queryRsp.error()));
}
uint64_t cnt = 0;
{
auto guard = config_.roundRobin_.wlock();
if (auto iter = guard->find(addr); iter != guard->end()) {
cnt = ++iter->second;
} else {
cnt = folly::Random::rand64();
guard->insert({addr, cnt});
}
}
auto match = queryRsp->selectDevice(describe(), cnt);
CO_RETURN_ON_ERROR(match);
XLOGF(INFO, "IBSocket {} connect with dev {}", describe(), match->describe());
auto port = match->localDev->openPort(match->localPort);
CO_RETURN_ON_ERROR(port);
port_ = std::move(*port);
CO_RETURN_ON_ERROR(checkPort());
connectConfig_ = config_.clone().toIBConnectConfig(port_.isRoCE());
if (checkConfig() != 0) {
co_return makeError(StatusCode::kInvalidConfig, "IBSocket invalid configuration");
}
if (qpCreate() != 0) {
co_return makeError(RPCCode::kConnectFailed, "IBSocket failed to create QP.");
}
if (qpInit() != 0) {
co_return makeError(RPCCode::kConnectFailed, "IBSocket failed to init QP.");
}
// set state to CONNECTING
auto old = state_.exchange(State::CONNECTING);
XLOGF_IF(FATAL, old != State::INIT, "IBSocket {} old state {} != INIT", describe(), magic_enum::enum_name(old));
if (UNLIKELY(closed_)) {
XLOGF(WARN, "IBSocket {} closed before connected!", describe());
co_return makeError(RPCCode::kSocketClosed);
}
auto info = getConnectInfo();
CO_RETURN_ON_ERROR(info);
IBConnectReq connect(*info, match->remoteDev, match->remoteDevName, match->remotePort, connectConfig_);
auto connectRsp = co_await IBConnect<>::connect(ctx, connect, &opts);
if (connectRsp.hasError()) {
XLOGF(ERR, "IBSocket {} failed to connect, error {}, timeout {}", describe(), connectRsp.error(), timeout);
co_return makeError(std::move(connectRsp.error()));
}
INJECT_CONNECTION_LOST();
setPeerInfo(folly::IPAddressV4::fromLong(ctx.addr().ip), *connectRsp);
if (qpReadyToRecv() != 0) {
if (closed_) {
co_return makeError(RPCCode::kSocketClosed, "IBSocket closed before connected");
}
co_return makeError(RPCCode::kConnectFailed);
}
if (qpReadyToSend() != 0) {
if (closed_) {
co_return makeError(RPCCode::kSocketClosed, "IBSocket closed before connected");
}
co_return makeError(RPCCode::kConnectFailed);
}
// post a empty msg to generate a event and notify remote side,
// after this msg send success, state will change to State::READY
auto bufIdx = sendBufs_.front().first;
sendBufs_.pop();
if (auto ret = postSend(bufIdx, 0, IBV_SEND_SIGNALED) != 0; ret != 0) {
XLOGF(CRITICAL, "IBSocket {} failed to send empty msg, errno {}", describe(), ret);
co_return Void{};
}
XLOGF(INFO, "IBSocket {} connected", describe());
failedGuard.dismiss();
co_return Void{};
}