arrow::Status run_server()

in dissociated-ipc/cudf-flight-server.cc [373:408]


arrow::Status run_server(const std::string& addr, const int port) {
  CudaUcxServer server;
  ARROW_ASSIGN_OR_RAISE(auto mgr, arrow::cuda::CudaDeviceManager::Instance());
  ARROW_ASSIGN_OR_RAISE(auto ctx, mgr->GetContext(0));
  server.set_cuda_context(ctx);
  ARROW_RETURN_NOT_OK(server.initialize());

  flight::Location ctrl_server_location = server.ctrl_location();
  flight::Location data_server_location = server.data_location();

  std::shared_ptr<arrow::Schema> sc;
  {
    auto source = cudf::io::source_info("./data/taxi-data/train.parquet");
    auto options = cudf::io::parquet_reader_options::builder(source).num_rows(1);
    auto result = cudf::io::read_parquet(options);

    auto schema = cudf::to_arrow_schema(result.tbl->view(),
                                        table_metadata_to_column(result.metadata));
    ARROW_ASSIGN_OR_RAISE(sc, arrow::ImportSchema(schema.get()));
  }

  auto flight_server = std::make_shared<CudaFlightServer>(
      std::move(ctrl_server_location), std::move(data_server_location));
  flight_server->register_schema("train.parquet", std::move(sc));

  ARROW_ASSIGN_OR_RAISE(auto loc, flight::Location::ForGrpcTcp(addr, port));
  flight::FlightServerOptions options(loc);

  RETURN_NOT_OK(flight_server->Init(options));
  RETURN_NOT_OK(flight_server->SetShutdownOnSignals({SIGTERM}));

  std::cout << "Flight Server Listening on " << flight_server->location().ToString()
            << std::endl;

  return flight_server->Serve();
}