arrow::Status RecvTag()

in dissociated-ipc/cudf-flight-client.cc [164:200]


  arrow::Status RecvTag(ucp_tag_message_h msg, ucp_tag_recv_info_t info_tag,
                        PendingMsg pending) {
    ++outstanding_tags_;
    ARROW_ASSIGN_OR_RAISE(auto buf, mm_->AllocateBuffer(info_tag.length));

    PendingMsg* new_pending = new PendingMsg(std::move(pending));
    new_pending->body = std::move(buf);
    new_pending->rdr = this;
    return data_cnxn_->RecvTagData(
        msg, reinterpret_cast<void*>(new_pending->body->address()), info_tag.length,
        new_pending,
        [](void* request, ucs_status_t status, const ucp_tag_recv_info_t* tag_info,
           void* user_data) {
          auto pending =
              std::unique_ptr<PendingMsg>(reinterpret_cast<PendingMsg*>(user_data));
          if (status != UCS_OK) {
            ARROW_LOG(ERROR)
                << utils::FromUcsStatus("ucp_tag_recv_nbx_callback", status).ToString();
            pending->p.set_value(nullptr);
            return;
          }

          if (request) ucp_request_free(request);

          if (tag_info->sender_tag & kbody_mask_) {
            // pointer / offset list body
            // not yet implemented
          } else {
            // full body bytes, use the pending metadata and read our IPC message
            // as usual
            auto msg = *ipc::Message::Open(pending->metadata, pending->body);
            pending->p.set_value(std::move(msg));
            --pending->rdr->outstanding_tags_;
          }
        },
        (new_pending->body->is_cpu()) ? UCS_MEMORY_TYPE_HOST : UCS_MEMORY_TYPE_CUDA);
  }