dissociated-ipc/ucx_utils.cc (255 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <arpa/inet.h>
#include <netdb.h>
#include <ucp/api/ucp.h>
#include "arrow/status.h"
#include "arrow/util/io_util.h"
#include "ucx_utils.h"
namespace utils {
constexpr char UcxStatusDetail::kTypeId[];
arrow::Result<size_t> to_sockaddr(const std::string& host, const int32_t port,
struct sockaddr_storage* addr) {
if (host.empty()) {
return arrow::Status::Invalid("Must provide a host");
} else if (port < 0) {
return arrow::Status::Invalid("Must provide a port");
}
std::memset(addr, 0, sizeof(*addr));
struct addrinfo* info = nullptr;
int err = getaddrinfo(host.c_str(), /*service*/ nullptr, /*hints*/ nullptr, &info);
if (err != 0) {
if (err == EAI_SYSTEM) {
return arrow::internal::IOErrorFromErrno(errno, "[getaddrinfo] Failure resolving ",
host);
} else {
return arrow::Status::IOError("[getaddrinfo] Failure resolving ", host, ": ",
gai_strerror(err));
}
}
struct addrinfo* cur_info = info;
while (cur_info) {
if (cur_info->ai_family != AF_INET && cur_info->ai_family != AF_INET6) {
cur_info = cur_info->ai_next;
continue;
}
std::memcpy(addr, cur_info->ai_addr, cur_info->ai_addrlen);
if (cur_info->ai_family == AF_INET) {
reinterpret_cast<sockaddr_in*>(addr)->sin_port = htons(port);
} else if (cur_info->ai_family == AF_INET6) {
reinterpret_cast<sockaddr_in6*>(addr)->sin6_port = htons(port);
}
size_t addrlen = cur_info->ai_addrlen;
freeaddrinfo(info);
return addrlen;
}
if (info) freeaddrinfo(info);
return arrow::Status::IOError("[getaddrinfo] Failure resolving ", host,
": no results of a supported family returned");
}
arrow::Result<std::string> SockaddrToString(const struct sockaddr_storage& address) {
std::string result = "";
if (address.ss_family != AF_INET && address.ss_family != AF_INET6) {
return arrow::Status::NotImplemented("unknown address family");
}
uint16_t port = 0;
if (address.ss_family == AF_INET) {
result.resize(INET_ADDRSTRLEN + 1);
const auto* in_addr = reinterpret_cast<const struct sockaddr_in*>(&address);
if (!inet_ntop(address.ss_family, &in_addr->sin_addr, &result[0], INET_ADDRSTRLEN)) {
return arrow::internal::IOErrorFromErrno(errno,
"could not convert address to a string");
}
port = ntohs(in_addr->sin_port);
} else {
result.resize(INET6_ADDRSTRLEN + 1);
const auto* in6_addr = reinterpret_cast<const struct sockaddr_in6*>(&address);
if (!inet_ntop(address.ss_family, &in6_addr->sin6_addr, &result[0],
INET6_ADDRSTRLEN)) {
return arrow::internal::IOErrorFromErrno(errno,
"could not convert address to string");
}
port = ntohs(in6_addr->sin6_port);
}
const size_t pos = result.find('\0');
DCHECK_NE(pos, std::string::npos);
result[pos] = ':';
result.resize(pos + 1);
result += std::to_string(port);
return result;
}
std::string UcxStatusDetail::ToString() const { return ucs_status_string(status_); }
ucs_status_t UcxStatusDetail::Unwrap(const arrow::Status& status) {
if (!status.detail() || status.detail()->type_id() != kTypeId) return UCS_OK;
return dynamic_cast<const UcxStatusDetail*>(status.detail().get())->status_;
}
arrow::Status FromUcsStatus(const std::string& context, ucs_status_t ucs_status) {
switch (ucs_status) {
case UCS_OK:
return arrow::Status::OK();
case UCS_INPROGRESS:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_INPROGRESS ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_NO_MESSAGE:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_NO_MESSAGE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_NO_RESOURCE:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_NO_RESOURCE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_IO_ERROR:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_IO_ERROR ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_NO_MEMORY:
return arrow::Status::OutOfMemory(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_NO_MEMORY ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_INVALID_PARAM:
return arrow::Status::Invalid(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_INVALID_PARAM ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_UNREACHABLE:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_UNREACHABLE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_INVALID_ADDR:
return arrow::Status::Invalid(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_INVALID_ADDR ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_NOT_IMPLEMENTED:
return arrow::Status::NotImplemented(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_NOT_IMPLEMENTED ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_MESSAGE_TRUNCATED:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_MESSAGE_TRUNCATED ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_NO_PROGRESS:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_NO_PROGRESS ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_BUFFER_TOO_SMALL:
return arrow::Status::Invalid(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_BUFFER_TOO_SMALL ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_NO_ELEM:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_NO_ELEM ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_SOME_CONNECTS_FAILED:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_SOME_CONNECTS_FAILED ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_NO_DEVICE:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_NO_DEVICE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_BUSY:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_BUSY ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_CANCELED:
return arrow::Status::Cancelled(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_CANCELED ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_SHMEM_SEGMENT:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_SHMEM_SEGMENT ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_ALREADY_EXISTS:
return arrow::Status::AlreadyExists(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_ALREADY_EXISTS ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_OUT_OF_RANGE:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_OUT_OF_RANGE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_TIMED_OUT:
return arrow::Status::Cancelled(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_TIMED_OUT ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_EXCEEDS_LIMIT:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_EXCEEDS_LIMIT ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_UNSUPPORTED:
return arrow::Status::NotImplemented(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_UNSUPPORTED ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_REJECTED:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_REJECTED ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_NOT_CONNECTED:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_NOT_CONNECTED ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_CONNECTION_RESET:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_CONNECTION_RESET ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_FIRST_LINK_FAILURE:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_FIRST_LINK_FAILURE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_LAST_LINK_FAILURE:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_LAST_LINK_FAILURE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_FIRST_ENDPOINT_FAILURE:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_FIRST_ENDPOINT_FAILURE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_LAST_ENDPOINT_FAILURE:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_LAST_ENDPOINT_FAILURE ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_ENDPOINT_TIMEOUT:
return arrow::Status::IOError(
context, ": UCX error ", static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_ENDPOINT_TIMEOUT ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
case UCS_ERR_LAST:
return arrow::Status::IOError(context, ": UCX error ",
static_cast<int32_t>(ucs_status), ": ",
"UCS_ERR_LAST ", ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
default:
return arrow::Status::UnknownError(
context, ": Unknown UCX error: ", static_cast<int32_t>(ucs_status), " ",
ucs_status_string(ucs_status))
.WithDetail(std::make_shared<UcxStatusDetail>(ucs_status));
}
}
} // namespace utils