in dissociated-ipc/cudf-flight-server.cc [189:225]
arrow::Status write_ipc_metadata(utils::Connection* cnxn,
const ipc::IpcPayload& payload,
const uint32_t sequence_num) {
// our metadata messages are always CPU host memory
ucs_memory_type_t mem_type = UCS_MEMORY_TYPE_HOST;
// construct our 5 byte prefix, the message type followed by the sequence number
auto pending = std::make_unique<PendingIOV>();
pending->iovs.resize(2);
pending->iovs[0].buffer = malloc(5);
pending->iovs[0].length = 5;
reinterpret_cast<uint8_t*>(pending->iovs[0].buffer)[0] =
static_cast<uint8_t>(MetadataMsgType::METADATA);
utils::Uint32ToBytesLE(sequence_num,
reinterpret_cast<uint8_t*>(pending->iovs[0].buffer) + 1);
// after the prefix, we add the metadata we want to send
pending->iovs[1].buffer = const_cast<void*>(payload.metadata->data_as<void>());
pending->iovs[1].length = payload.metadata->size();
pending->body_buffers.emplace_back(payload.metadata);
auto* pending_iov = pending.get();
void* user_data = pending.release();
return cnxn->SendAMIov(
0, pending_iov->iovs.data(), pending_iov->iovs.size(), user_data,
[](void* request, ucs_status_t status, void* user_data) {
auto pending_iov =
std::unique_ptr<PendingIOV>(reinterpret_cast<PendingIOV*>(user_data));
if (request) ucp_request_free(request);
if (status != UCS_OK) {
ARROW_LOG(ERROR)
<< utils::FromUcsStatus("ucp_am_send_nbx_cb", status).ToString();
}
free(pending_iov->iovs[0].buffer);
},
mem_type);
}