arrow::Status write_ipc_metadata()

in dissociated-ipc/cudf-flight-server.cc [189:225]


  arrow::Status write_ipc_metadata(utils::Connection* cnxn,
                                   const ipc::IpcPayload& payload,
                                   const uint32_t sequence_num) {
    // our metadata messages are always CPU host memory
    ucs_memory_type_t mem_type = UCS_MEMORY_TYPE_HOST;

    // construct our 5 byte prefix, the message type followed by the sequence number
    auto pending = std::make_unique<PendingIOV>();
    pending->iovs.resize(2);
    pending->iovs[0].buffer = malloc(5);
    pending->iovs[0].length = 5;
    reinterpret_cast<uint8_t*>(pending->iovs[0].buffer)[0] =
        static_cast<uint8_t>(MetadataMsgType::METADATA);
    utils::Uint32ToBytesLE(sequence_num,
                           reinterpret_cast<uint8_t*>(pending->iovs[0].buffer) + 1);

    // after the prefix, we add the metadata we want to send
    pending->iovs[1].buffer = const_cast<void*>(payload.metadata->data_as<void>());
    pending->iovs[1].length = payload.metadata->size();
    pending->body_buffers.emplace_back(payload.metadata);

    auto* pending_iov = pending.get();
    void* user_data = pending.release();
    return cnxn->SendAMIov(
        0, pending_iov->iovs.data(), pending_iov->iovs.size(), user_data,
        [](void* request, ucs_status_t status, void* user_data) {
          auto pending_iov =
              std::unique_ptr<PendingIOV>(reinterpret_cast<PendingIOV*>(user_data));
          if (request) ucp_request_free(request);
          if (status != UCS_OK) {
            ARROW_LOG(ERROR)
                << utils::FromUcsStatus("ucp_am_send_nbx_cb", status).ToString();
          }
          free(pending_iov->iovs[0].buffer);
        },
        mem_type);
  }