in src/common/net/ib/IBDevice.cc [271:372]
Result<IBDevice::Ptr> IBDevice::open(ibv_device *dev,
uint8_t devId,
std::map<std::pair<std::string, uint8_t>, std::string> ib2net,
std::multimap<std::string, IfAddrs::Addr> ifaddrs,
const IBConfig &config) {
IBDevice::Ptr device(new IBDevice());
device->devId_ = devId;
device->name_ = ibv_get_device_name(dev);
device->context_.reset(ibv_open_device(dev));
if (!device->context_) {
XLOGF(ERR, "IBDevice failed to open {}, errno {}", device->name_, errno);
return makeError(RPCCode::kIBInitFailed);
}
device->pd_.reset(ibv_alloc_pd(device->context_.get()));
if (!device->pd_) {
XLOGF(ERR, "IBDevice failed to alloc pd for {}, errno {}", device->name_, errno);
return makeError(RPCCode::kIBInitFailed);
}
if (auto ret = ibv_query_device(device->context_.get(), &device->attr_); ret != 0) {
XLOGF(ERR, "IBDevice failed to query device {}, errno {}", device->name_, errno);
return makeError(RPCCode::kIBInitFailed);
}
auto filter = [&filter = config.device_filter()](std::string name) {
return filter.empty() || std::find(filter.begin(), filter.end(), name) != filter.end();
};
std::set<uint8_t> ports;
for (uint8_t portNum = 1; portNum <= device->attr_.phys_port_cnt; portNum++) {
auto iter = ib2net.find({device->name_, portNum});
auto netdev = (iter != ib2net.end()) ? std::optional(iter->second) : std::nullopt;
if (filter(device->name_) || (netdev && filter(*netdev))) {
ports.emplace(portNum);
} else {
XLOGF(INFO, "Skip device {}, port {} because it's not in device filter.", device->name_, portNum);
}
}
auto flags = fcntl(device->context()->async_fd, F_GETFL);
auto ret = fcntl(device->context()->async_fd, F_SETFL, flags | O_NONBLOCK);
if (ret < 0) {
XLOGF(ERR, "IBDevice {} failed to set async fd to NONBLOCK.", device->name());
return makeError(RPCCode::kIBInitFailed);
}
for (uint8_t portNum = 1; portNum <= device->attr_.phys_port_cnt; portNum++) {
if (!ports.contains(portNum)) {
continue;
}
ibv_port_attr portAttr;
if (auto ret = ibv_query_port(device->context_.get(), portNum, &portAttr); ret != 0) {
XLOGF(ERR, "IBDevice failed to query port {} of device {}, errno {}", portNum, device->name_, ret);
return makeError(RPCCode::kIBInitFailed);
}
if (portAttr.link_layer != IBV_LINK_LAYER_ETHERNET && portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND) {
XLOGF(WARN,
"IBDevice skip port {} of device {}, linklayer {} is not RoCE or INFINIBAND.",
portNum,
device->name_,
portAttr.link_layer);
continue;
}
bool inactive = (portAttr.state != IBV_PORT_ACTIVE && portAttr.state != IBV_PORT_ACTIVE_DEFER);
XLOGF_IF(WARN,
inactive,
"IBDevice {} port {} is not active, state {}, skip {}",
device->name_,
portNum,
magic_enum::enum_name(portAttr.state),
config.skip_inactive_ports());
if (inactive && config.skip_inactive_ports()) {
continue;
}
Port port;
port.addrs = getIBPortAddrs(ib2net, ifaddrs, device->name_, portNum);
port.zones = getZonesByAddrs(port.addrs, config, device->name_, portNum);
port.attr = portAttr;
if (!config.allow_unknown_zone() && port.zones == std::set<std::string>{std::string(IBConfig::kUnknownZone)}) {
XLOGF(CRITICAL, "IBDevice {}:{}'s zone is unknown!!!", device->name_, portNum);
return makeError(StatusCode::kInvalidConfig);
}
std::optional<ibv_gid> rocev2Gid;
if (portAttr.link_layer == IBV_LINK_LAYER_ETHERNET) {
auto result = queryRoCEv2GID(device->context(), portNum);
RETURN_ON_ERROR(result);
rocev2Gid = result->first;
}
XLOGF(INFO,
"IBDevice {} add active port {}, linklayer {}, addrs {}, zones {}, RoCE v2 GID {}",
device->name_,
portNum,
fmt::ibvLinklayerName(portAttr.link_layer),
fmt::join(port.addrs.begin(), port.addrs.end(), ";"),
fmt::join(port.zones.begin(), port.zones.end(), ";"),
OptionalFmt(rocev2Gid));
device->ports_[portNum] = std::move(port);
}
return device;
}