arrow::Status run_client()

in dissociated-ipc/cudf-flight-client.cc [327:384]


arrow::Status run_client(const std::string& addr, const int port) {
  ARROW_ASSIGN_OR_RAISE(auto location, flight::Location::ForGrpcTcp(addr, port));
  ARROW_ASSIGN_OR_RAISE(auto client, flight::FlightClient::Connect(location));

  ARROW_ASSIGN_OR_RAISE(
      auto info,
      client->GetFlightInfo(flight::FlightDescriptor::Command("train.parquet")));
  ARROW_LOG(DEBUG) << info->endpoints()[0].locations[0].ToString();
  ARROW_LOG(DEBUG) << info->endpoints()[0].locations[1].ToString();

  ARROW_ASSIGN_OR_RAISE(auto ctrl_uri, arrow::util::Uri::FromString(
                                           info->endpoints()[0].locations[0].ToString()));
  ARROW_ASSIGN_OR_RAISE(auto data_uri, arrow::util::Uri::FromString(
                                           info->endpoints()[0].locations[1].ToString()));

  ARROW_ASSIGN_OR_RAISE(ucp_tag_t ctrl_tag, get_want_data_tag(ctrl_uri));
  ARROW_ASSIGN_OR_RAISE(ucp_tag_t data_tag, get_want_data_tag(data_uri));
  const std::string& ident = info->endpoints()[0].ticket.ticket;

  ARROW_ASSIGN_OR_RAISE(auto cuda_mgr, arrow::cuda::CudaDeviceManager::Instance());
  ARROW_ASSIGN_OR_RAISE(auto device, cuda_mgr->GetDevice(0));
  ARROW_ASSIGN_OR_RAISE(auto cuda_device, arrow::cuda::AsCudaDevice(device));
  ARROW_ASSIGN_OR_RAISE(auto ctx, cuda_device->GetContext());
  cuCtxPushCurrent(reinterpret_cast<CUcontext>(ctx->handle()));

  ARROW_LOG(DEBUG) << device->ToString();

  UcxClient ctrl_client, data_client;
  ARROW_RETURN_NOT_OK(ctrl_client.Init(ctrl_uri.host(), ctrl_uri.port()));
  ARROW_RETURN_NOT_OK(data_client.Init(data_uri.host(), data_uri.port()));

  ARROW_ASSIGN_OR_RAISE(auto ctrl_cnxn, ctrl_client.CreateConn());
  ARROW_ASSIGN_OR_RAISE(auto data_cnxn, data_client.CreateConn());

  StreamReader rdr(ctrl_cnxn.get(), data_cnxn.get());
  rdr.set_data_mem_manager(ctx->memory_manager());

  ARROW_RETURN_NOT_OK(rdr.Start(ctrl_tag, data_tag, ident));

  ARROW_ASSIGN_OR_RAISE(auto s, rdr.Schema());
  std::cout << s->ToString() << std::endl;
  while (true) {
    ARROW_ASSIGN_OR_RAISE(auto batch, rdr.Next());
    if (!batch) {
      break;
    }

    std::cout << batch->num_columns() << " " << batch->num_rows() << std::endl;
    std::cout << batch->column(0)->data()->buffers[1]->device()->ToString() << std::endl;
    ARROW_ASSIGN_OR_RAISE(auto cpubatch,
                          batch->CopyTo(arrow::default_cpu_memory_manager()));
    std::cout << cpubatch->ToString() << std::endl;
  }

  ARROW_CHECK_OK(ctrl_cnxn->Close());
  ARROW_CHECK_OK(data_cnxn->Close());
  return arrow::Status::OK();
}