in dissociated-ipc/cudf-flight-server.cc [67:105]
arrow::Status initialize() {
// load the parquet data directly onto the GPU as a libcudf table
auto source = cudf::io::source_info("./data/taxi-data/train.parquet");
auto options = cudf::io::parquet_reader_options::builder(source);
cudf::io::chunked_parquet_reader rdr(1 * 1024 * 1024, options);
// get arrow::RecordBatches for each chunk of the parquet data while
// leaving the data on the GPU
arrow::RecordBatchVector batches;
auto chunk = rdr.read_chunk();
auto schema = cudf::to_arrow_schema(chunk.tbl->view(),
table_metadata_to_column(chunk.metadata));
auto device_out = cudf::to_arrow_device(std::move(*chunk.tbl));
ARROW_ASSIGN_OR_RAISE(auto data,
arrow::ImportDeviceRecordBatch(device_out.get(), schema.get()));
batches.push_back(std::move(data));
while (rdr.has_next()) {
chunk = rdr.read_chunk();
device_out = cudf::to_arrow_device(std::move(*chunk.tbl));
ARROW_ASSIGN_OR_RAISE(
data, arrow::ImportDeviceRecordBatch(device_out.get(), schema.get()));
batches.push_back(std::move(data));
}
data_sets_.emplace("train.parquet", std::move(batches));
// initialize the server and let it choose its own port
ARROW_RETURN_NOT_OK(Init("127.0.0.1", 0));
ARROW_ASSIGN_OR_RAISE(ctrl_location_,
flight::Location::Parse(location_.ToString() + "?want_data=" +
std::to_string(kWantCtrlTag)));
ARROW_ASSIGN_OR_RAISE(data_location_,
flight::Location::Parse(location_.ToString() + "?want_data=" +
std::to_string(kWantDataTag)));
return arrow::Status::OK();
}