dissociated-ipc/ucx_client.cc (42 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "ucx_client.h"
#include "ucx_utils.h"
#include <memory>
#include <string>
arrow::Status UcxClient::Init(const std::string& host, const int32_t port) {
ucp_config_t* ucp_config;
ucp_params_t ucp_params;
ucs_status_t status;
status = ucp_config_read(nullptr, nullptr, &ucp_config);
ARROW_RETURN_NOT_OK(utils::FromUcsStatus("ucp_config_read", status));
// if location is IPv6 must adjust UCX config
// we assume locations always resolve to IPv6 or IPv4
// but that's not necessarily true
ARROW_ASSIGN_OR_RAISE(addrlen_, utils::to_sockaddr(host, port, &connect_addr_));
if (connect_addr_.ss_family == AF_INET6) {
ARROW_RETURN_NOT_OK(utils::FromUcsStatus(
"ucp_config_modify", ucp_config_modify(ucp_config, "AF_PRIO", "inet6")));
}
std::memset(&ucp_params, 0, sizeof(ucp_params));
ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;
ucp_params.features = UCP_FEATURE_WAKEUP | UCP_FEATURE_AM | UCP_FEATURE_RMA |
UCP_FEATURE_STREAM | UCP_FEATURE_TAG;
ucp_context_h ucp_context;
status = ucp_init(&ucp_params, ucp_config, &ucp_context);
ucp_config_release(ucp_config);
ARROW_RETURN_NOT_OK(utils::FromUcsStatus("ucp_init", status));
ucp_context_.reset(new utils::UcpContext(ucp_context));
return arrow::Status::OK();
}
arrow::Result<std::unique_ptr<utils::Connection>> UcxClient::CreateConn() {
ucp_worker_params_t worker_params;
std::memset(&worker_params, 0, sizeof(worker_params));
worker_params.field_mask =
UCP_WORKER_PARAM_FIELD_THREAD_MODE | UCP_WORKER_PARAM_FIELD_FLAGS;
worker_params.thread_mode = UCS_THREAD_MODE_MULTI;
worker_params.flags = UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK;
ucp_worker_h ucp_worker;
ucs_status_t status =
ucp_worker_create(ucp_context_->get(), &worker_params, &ucp_worker);
ARROW_RETURN_NOT_OK(utils::FromUcsStatus("ucp_worker_create", status));
auto cnxn = std::make_unique<utils::Connection>(
std::make_shared<utils::UcpWorker>(ucp_context_, ucp_worker));
ARROW_RETURN_NOT_OK(cnxn->CreateEndpoint(connect_addr_, addrlen_));
return cnxn;
}