in dissociated-ipc/cudf-flight-client.cc [327:384]
arrow::Status run_client(const std::string& addr, const int port) {
ARROW_ASSIGN_OR_RAISE(auto location, flight::Location::ForGrpcTcp(addr, port));
ARROW_ASSIGN_OR_RAISE(auto client, flight::FlightClient::Connect(location));
ARROW_ASSIGN_OR_RAISE(
auto info,
client->GetFlightInfo(flight::FlightDescriptor::Command("train.parquet")));
ARROW_LOG(DEBUG) << info->endpoints()[0].locations[0].ToString();
ARROW_LOG(DEBUG) << info->endpoints()[0].locations[1].ToString();
ARROW_ASSIGN_OR_RAISE(auto ctrl_uri, arrow::util::Uri::FromString(
info->endpoints()[0].locations[0].ToString()));
ARROW_ASSIGN_OR_RAISE(auto data_uri, arrow::util::Uri::FromString(
info->endpoints()[0].locations[1].ToString()));
ARROW_ASSIGN_OR_RAISE(ucp_tag_t ctrl_tag, get_want_data_tag(ctrl_uri));
ARROW_ASSIGN_OR_RAISE(ucp_tag_t data_tag, get_want_data_tag(data_uri));
const std::string& ident = info->endpoints()[0].ticket.ticket;
ARROW_ASSIGN_OR_RAISE(auto cuda_mgr, arrow::cuda::CudaDeviceManager::Instance());
ARROW_ASSIGN_OR_RAISE(auto device, cuda_mgr->GetDevice(0));
ARROW_ASSIGN_OR_RAISE(auto cuda_device, arrow::cuda::AsCudaDevice(device));
ARROW_ASSIGN_OR_RAISE(auto ctx, cuda_device->GetContext());
cuCtxPushCurrent(reinterpret_cast<CUcontext>(ctx->handle()));
ARROW_LOG(DEBUG) << device->ToString();
UcxClient ctrl_client, data_client;
ARROW_RETURN_NOT_OK(ctrl_client.Init(ctrl_uri.host(), ctrl_uri.port()));
ARROW_RETURN_NOT_OK(data_client.Init(data_uri.host(), data_uri.port()));
ARROW_ASSIGN_OR_RAISE(auto ctrl_cnxn, ctrl_client.CreateConn());
ARROW_ASSIGN_OR_RAISE(auto data_cnxn, data_client.CreateConn());
StreamReader rdr(ctrl_cnxn.get(), data_cnxn.get());
rdr.set_data_mem_manager(ctx->memory_manager());
ARROW_RETURN_NOT_OK(rdr.Start(ctrl_tag, data_tag, ident));
ARROW_ASSIGN_OR_RAISE(auto s, rdr.Schema());
std::cout << s->ToString() << std::endl;
while (true) {
ARROW_ASSIGN_OR_RAISE(auto batch, rdr.Next());
if (!batch) {
break;
}
std::cout << batch->num_columns() << " " << batch->num_rows() << std::endl;
std::cout << batch->column(0)->data()->buffers[1]->device()->ToString() << std::endl;
ARROW_ASSIGN_OR_RAISE(auto cpubatch,
batch->CopyTo(arrow::default_cpu_memory_manager()));
std::cout << cpubatch->ToString() << std::endl;
}
ARROW_CHECK_OK(ctrl_cnxn->Close());
ARROW_CHECK_OK(data_cnxn->Close());
return arrow::Status::OK();
}