arrow::Status write_ipc_body()

in dissociated-ipc/cudf-flight-server.cc [260:325]


  arrow::Status write_ipc_body(utils::Connection* cnxn, const ipc::IpcPayload& payload,
                               const uint32_t sequence_num) {
    ucs_memory_type_t mem_type = UCS_MEMORY_TYPE_CUDA;

    // determine the number of buffers and padding we need along with the total size
    auto pending = std::make_unique<PendingIOV>();
    int32_t total_buffers = 0;
    for (const auto& buffer : payload.body_buffers) {
      if (!buffer || buffer->size() == 0) continue;
      if (buffer->is_cpu()) {
        mem_type = UCS_MEMORY_TYPE_HOST;
      }
      total_buffers++;
      // arrow ipc requires aligning buffers to 8 byte boundary
      const auto remainder = static_cast<int>(
          arrow::bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
      if (remainder) total_buffers++;
    }

    pending->iovs.resize(total_buffers);
    // we'll use scatter-gather to avoid extra copies
    ucp_dt_iov_t* iov = pending->iovs.data();
    pending->body_buffers = payload.body_buffers;

    void* padding_bytes =
        const_cast<void*>(reinterpret_cast<const void*>(padding_bytes_.data()));
    if (mem_type == UCS_MEMORY_TYPE_CUDA) {
      padding_bytes = cuda_padding_bytes_.data();
    }

    for (const auto& buffer : payload.body_buffers) {
      if (!buffer || buffer->size() == 0) continue;
      // for cuda memory, buffer->address() will return a device pointer
      iov->buffer = const_cast<void*>(reinterpret_cast<const void*>(buffer->address()));
      iov->length = buffer->size();
      ++iov;

      const auto remainder = static_cast<int>(
          arrow::bit_util::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
      if (remainder) {
        iov->buffer = padding_bytes;
        iov->length = remainder;
        ++iov;
      }
    }

    auto pending_iov = pending.release();
    // indicate that we're sending the full data body, not a pointer list and add
    // the sequence number to the tag
    ucp_tag_t tag =
        (uint64_t(0) << kShiftBodyType) | arrow::bit_util::ToLittleEndian(sequence_num);
    return cnxn->SendTagIov(
        tag, pending_iov->iovs.data(), pending_iov->iovs.size(), pending_iov,
        [](void* request, ucs_status_t status, void* user_data) {
          auto pending_iov =
              std::unique_ptr<PendingIOV>(reinterpret_cast<PendingIOV*>(user_data));
          if (status != UCS_OK) {
            ARROW_LOG(ERROR)
                << utils::FromUcsStatus("ucp_tag_send_nbx_cb", status).ToString();
          }
          if (request) {
            ucp_request_free(request);
          }
        },
        mem_type);
  }