in dissociated-ipc/cudf-flight-client.cc [164:200]
arrow::Status RecvTag(ucp_tag_message_h msg, ucp_tag_recv_info_t info_tag,
PendingMsg pending) {
++outstanding_tags_;
ARROW_ASSIGN_OR_RAISE(auto buf, mm_->AllocateBuffer(info_tag.length));
PendingMsg* new_pending = new PendingMsg(std::move(pending));
new_pending->body = std::move(buf);
new_pending->rdr = this;
return data_cnxn_->RecvTagData(
msg, reinterpret_cast<void*>(new_pending->body->address()), info_tag.length,
new_pending,
[](void* request, ucs_status_t status, const ucp_tag_recv_info_t* tag_info,
void* user_data) {
auto pending =
std::unique_ptr<PendingMsg>(reinterpret_cast<PendingMsg*>(user_data));
if (status != UCS_OK) {
ARROW_LOG(ERROR)
<< utils::FromUcsStatus("ucp_tag_recv_nbx_callback", status).ToString();
pending->p.set_value(nullptr);
return;
}
if (request) ucp_request_free(request);
if (tag_info->sender_tag & kbody_mask_) {
// pointer / offset list body
// not yet implemented
} else {
// full body bytes, use the pending metadata and read our IPC message
// as usual
auto msg = *ipc::Message::Open(pending->metadata, pending->body);
pending->p.set_value(std::move(msg));
--pending->rdr->outstanding_tags_;
}
},
(new_pending->body->is_cpu()) ? UCS_MEMORY_TYPE_HOST : UCS_MEMORY_TYPE_CUDA);
}