CoTryTask IBSocket::connect()

in src/common/net/ib/IBConnect.cc [338:451]


CoTryTask<void> IBSocket::connect(serde::ClientContext &ctx, Duration timeout) {
  auto begin = SteadyClock::now();
  connecting.addSample(1);
  SCOPE_EXIT {
    connecting.addSample(-1);
    connectLatency.addSample(SteadyClock::now() - begin);
  };
  auto failedGuard = folly::makeGuard([]() { connectFailed.addSample(1); });

  if (!IBManager::initialized()) {
    XLOGF(CRITICAL, "IBDevice not initialized!");
    co_return makeError(RPCCode::kIBDeviceNotInitialized);
  }

  net::UserRequestOptions opts;
  opts.timeout = timeout;
  auto addr = ctx.addr();
  {
    auto des = describe_.wlock();
    *des = fmt::format("[connect RDMA://{}]", addr.toFollyAddress().describe());
  }
  if (addr.isRDMA()) {
    XLOGF(FATAL, "IBSocket::connect should call with a TCP net client");
  }

  IBQueryReq query;
  auto queryRsp = co_await IBConnect<>::query(ctx, query, &opts);
  if (queryRsp.hasError()) {
    auto now = RelativeTime::now();
    if (now - lastLogErrorTime[addr] >= 10_s) {
      XLOGF(ERR, "IBSocket {} failed to query, error {}, timeout {}", describe(), queryRsp.error(), timeout);
      lastLogErrorTime.insert_or_assign(addr, now);
    }
    co_return makeError(std::move(queryRsp.error()));
  }

  uint64_t cnt = 0;
  {
    auto guard = config_.roundRobin_.wlock();
    if (auto iter = guard->find(addr); iter != guard->end()) {
      cnt = ++iter->second;
    } else {
      cnt = folly::Random::rand64();
      guard->insert({addr, cnt});
    }
  }
  auto match = queryRsp->selectDevice(describe(), cnt);
  CO_RETURN_ON_ERROR(match);
  XLOGF(INFO, "IBSocket {} connect with dev {}", describe(), match->describe());

  auto port = match->localDev->openPort(match->localPort);
  CO_RETURN_ON_ERROR(port);
  port_ = std::move(*port);
  CO_RETURN_ON_ERROR(checkPort());

  connectConfig_ = config_.clone().toIBConnectConfig(port_.isRoCE());
  if (checkConfig() != 0) {
    co_return makeError(StatusCode::kInvalidConfig, "IBSocket invalid configuration");
  }
  if (qpCreate() != 0) {
    co_return makeError(RPCCode::kConnectFailed, "IBSocket failed to create QP.");
  }
  if (qpInit() != 0) {
    co_return makeError(RPCCode::kConnectFailed, "IBSocket failed to init QP.");
  }
  // set state to CONNECTING
  auto old = state_.exchange(State::CONNECTING);
  XLOGF_IF(FATAL, old != State::INIT, "IBSocket {} old state {} != INIT", describe(), magic_enum::enum_name(old));

  if (UNLIKELY(closed_)) {
    XLOGF(WARN, "IBSocket {} closed before connected!", describe());
    co_return makeError(RPCCode::kSocketClosed);
  }

  auto info = getConnectInfo();
  CO_RETURN_ON_ERROR(info);

  IBConnectReq connect(*info, match->remoteDev, match->remoteDevName, match->remotePort, connectConfig_);
  auto connectRsp = co_await IBConnect<>::connect(ctx, connect, &opts);
  if (connectRsp.hasError()) {
    XLOGF(ERR, "IBSocket {} failed to connect, error {}, timeout {}", describe(), connectRsp.error(), timeout);
    co_return makeError(std::move(connectRsp.error()));
  }

  INJECT_CONNECTION_LOST();

  setPeerInfo(folly::IPAddressV4::fromLong(ctx.addr().ip), *connectRsp);
  if (qpReadyToRecv() != 0) {
    if (closed_) {
      co_return makeError(RPCCode::kSocketClosed, "IBSocket closed before connected");
    }
    co_return makeError(RPCCode::kConnectFailed);
  }
  if (qpReadyToSend() != 0) {
    if (closed_) {
      co_return makeError(RPCCode::kSocketClosed, "IBSocket closed before connected");
    }
    co_return makeError(RPCCode::kConnectFailed);
  }

  // post a empty msg to generate a event and notify remote side,
  // after this msg send success, state will change to State::READY
  auto bufIdx = sendBufs_.front().first;
  sendBufs_.pop();
  if (auto ret = postSend(bufIdx, 0, IBV_SEND_SIGNALED) != 0; ret != 0) {
    XLOGF(CRITICAL, "IBSocket {} failed to send empty msg, errno {}", describe(), ret);
    co_return Void{};
  }
  XLOGF(INFO, "IBSocket {} connected", describe());

  failedGuard.dismiss();

  co_return Void{};
}