in dissociated-ipc/cudf-flight-server.cc [260:325]
arrow::Status write_ipc_body(utils::Connection* cnxn, const ipc::IpcPayload& payload,
const uint32_t sequence_num) {
ucs_memory_type_t mem_type = UCS_MEMORY_TYPE_CUDA;
// determine the number of buffers and padding we need along with the total size
auto pending = std::make_unique<PendingIOV>();
int32_t total_buffers = 0;
for (const auto& buffer : payload.body_buffers) {
if (!buffer || buffer->size() == 0) continue;
if (buffer->is_cpu()) {
mem_type = UCS_MEMORY_TYPE_HOST;
}
total_buffers++;
// arrow ipc requires aligning buffers to 8 byte boundary
const auto remainder = static_cast<int>(
arrow::bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
if (remainder) total_buffers++;
}
pending->iovs.resize(total_buffers);
// we'll use scatter-gather to avoid extra copies
ucp_dt_iov_t* iov = pending->iovs.data();
pending->body_buffers = payload.body_buffers;
void* padding_bytes =
const_cast<void*>(reinterpret_cast<const void*>(padding_bytes_.data()));
if (mem_type == UCS_MEMORY_TYPE_CUDA) {
padding_bytes = cuda_padding_bytes_.data();
}
for (const auto& buffer : payload.body_buffers) {
if (!buffer || buffer->size() == 0) continue;
// for cuda memory, buffer->address() will return a device pointer
iov->buffer = const_cast<void*>(reinterpret_cast<const void*>(buffer->address()));
iov->length = buffer->size();
++iov;
const auto remainder = static_cast<int>(
arrow::bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
if (remainder) {
iov->buffer = padding_bytes;
iov->length = remainder;
++iov;
}
}
auto pending_iov = pending.release();
// indicate that we're sending the full data body, not a pointer list and add
// the sequence number to the tag
ucp_tag_t tag =
(uint64_t(0) << kShiftBodyType) | arrow::bit_util::ToLittleEndian(sequence_num);
return cnxn->SendTagIov(
tag, pending_iov->iovs.data(), pending_iov->iovs.size(), pending_iov,
[](void* request, ucs_status_t status, void* user_data) {
auto pending_iov =
std::unique_ptr<PendingIOV>(reinterpret_cast<PendingIOV*>(user_data));
if (status != UCS_OK) {
ARROW_LOG(ERROR)
<< utils::FromUcsStatus("ucp_tag_send_nbx_cb", status).ToString();
}
if (request) {
ucp_request_free(request);
}
},
mem_type);
}