in dissociated-ipc/cudf-flight-client.cc [110:159]
void run_data_loop() {
if (arrow::cuda::IsCudaMemoryManager(*mm_)) {
// since we're in a new thread, we need to make sure to push the cuda context
// so that ucx uses the same cuda context as the Arrow data is using, otherwise
// the device pointers aren't valid
auto ctx = *(*arrow::cuda::AsCudaMemoryManager(mm_))->cuda_device()->GetContext();
cuCtxPushCurrent(reinterpret_cast<CUcontext>(ctx->handle()));
}
while (true) {
// progress the connection until an event happens
while (data_cnxn_->Progress()) {
}
{
// check if we have received any metadata which indicate we need to poll
// for a corresponding tagged data message
std::unique_lock<std::mutex> guard(polling_mutex_);
for (auto it = polling_map_.begin(); it != polling_map_.end();) {
auto maybe_tag =
data_cnxn_->ProbeForTag(ucp_tag_t(it->first), 0x00000000FFFFFFFF, 1);
if (!maybe_tag.ok()) {
ARROW_LOG(ERROR) << maybe_tag.status().ToString();
return;
}
auto tag_pair = maybe_tag.MoveValueUnsafe();
if (tag_pair.second != nullptr) {
// got one!
auto st = RecvTag(tag_pair.second, tag_pair.first, std::move(it->second));
if (!st.ok()) {
ARROW_LOG(ERROR) << st.ToString();
return;
}
it = polling_map_.erase(it);
} else {
++it;
}
}
}
// if the metadata stream has ended...
if (finished_metadata_.load()) {
// we are done if there's nothing left to poll for and nothing outstanding
std::lock_guard<std::mutex> guard(polling_mutex_);
if (polling_map_.empty() && outstanding_tags_.load() == 0) {
break;
}
}
}
}