in dissociated-ipc/ucx_server.cc [140:165]
arrow::Status UcxServer::Shutdown() {
if (!listening_.load()) return arrow::Status::OK();
arrow::Status status;
// wait for current running things to finish
listening_.store(false);
RETURN_NOT_OK(
utils::FromUcsStatus("ucp_worker_signal", ucp_worker_signal(worker_conn_->get())));
status &= Wait();
{
// reject all pending connections
std::lock_guard<std::mutex> guard(pending_connections_mutex_);
while (!pending_connections_.empty()) {
status &= utils::FromUcsStatus(
"ucp_listener_reject",
ucp_listener_reject(listener_, pending_connections_.front()));
pending_connections_.pop();
}
ucp_listener_destroy(listener_);
worker_conn_.reset();
}
ucp_context_.reset();
return status;
}