in dissociated-ipc/ucx_server.cc [195:260]
void UcxServer::HandleConnection(ucp_conn_request_h request) {
using arrow::internal::ToChars;
std::string peer = "unknown:" + ToChars(counter_++);
{
ucp_conn_request_attr_t request_attr;
std::memset(&request_attr, 0, sizeof(request_attr));
request_attr.field_mask = UCP_CONN_REQUEST_ATTR_FIELD_CLIENT_ADDR;
if (ucp_conn_request_query(request, &request_attr) == UCS_OK) {
ARROW_UNUSED(utils::SockaddrToString(request_attr.client_address).Value(&peer));
}
}
ARROW_LOG(DEBUG) << peer << ": Received connection request";
auto maybe_worker = CreateWorker();
if (!maybe_worker.ok()) {
ARROW_LOG(ERROR) << peer << ": failed to create worker"
<< maybe_worker.status().ToString();
auto status = ucp_listener_reject(listener_, request);
if (status != UCS_OK) {
ARROW_LOG(ERROR) << peer << ": "
<< utils::FromUcsStatus("ucp_listener_reject", status).ToString();
}
return;
}
auto worker = maybe_worker.MoveValueUnsafe();
worker->conn_ = std::make_unique<utils::Connection>(worker->worker_);
auto status = worker->conn_->CreateEndpoint(request);
if (!status.ok()) {
ARROW_LOG(ERROR) << peer << ": failed to create endpoint and connection: "
<< status.ToString();
return;
}
if (cuda_context_) {
auto result = cuCtxPushCurrent(reinterpret_cast<CUcontext>(cuda_context_->handle()));
if (result != CUDA_SUCCESS) {
const char* err_name = "\0";
const char* err_string = "\0";
cuGetErrorName(result, &err_name);
cuGetErrorString(result, &err_string);
ARROW_LOG(ERROR) << peer << ": failed pushing cuda context on thread: " << err_name
<< " - " << err_string;
return;
}
}
auto st = do_work(worker.get());
if (!st.ok()) {
ARROW_LOG(ERROR) << peer << ": error from do_work: " << st.ToString();
}
while (!worker->conn_->is_closed()) {
worker->conn_->Progress();
}
// clean up
status = worker->conn_->Close();
if (!status.ok()) {
ARROW_LOG(ERROR) << peer
<< ": failed to close worker connection: " << status.ToString();
}
worker->worker_.reset();
worker->conn_.reset();
ARROW_LOG(DEBUG) << peer << ": disconnected";
}