arrow::Status initialize()

in dissociated-ipc/cudf-flight-server.cc [67:105]


  arrow::Status initialize() {
    // load the parquet data directly onto the GPU as a libcudf table
    auto source = cudf::io::source_info("./data/taxi-data/train.parquet");
    auto options = cudf::io::parquet_reader_options::builder(source);
    cudf::io::chunked_parquet_reader rdr(1 * 1024 * 1024, options);

    // get arrow::RecordBatches for each chunk of the parquet data while
    // leaving the data on the GPU
    arrow::RecordBatchVector batches;
    auto chunk = rdr.read_chunk();
    auto schema = cudf::to_arrow_schema(chunk.tbl->view(),
                                        table_metadata_to_column(chunk.metadata));
    auto device_out = cudf::to_arrow_device(std::move(*chunk.tbl));
    ARROW_ASSIGN_OR_RAISE(auto data,
                          arrow::ImportDeviceRecordBatch(device_out.get(), schema.get()));

    batches.push_back(std::move(data));

    while (rdr.has_next()) {
      chunk = rdr.read_chunk();
      device_out = cudf::to_arrow_device(std::move(*chunk.tbl));
      ARROW_ASSIGN_OR_RAISE(
          data, arrow::ImportDeviceRecordBatch(device_out.get(), schema.get()));
      batches.push_back(std::move(data));
    }

    data_sets_.emplace("train.parquet", std::move(batches));

    // initialize the server and let it choose its own port
    ARROW_RETURN_NOT_OK(Init("127.0.0.1", 0));

    ARROW_ASSIGN_OR_RAISE(ctrl_location_,
                          flight::Location::Parse(location_.ToString() + "?want_data=" +
                                                  std::to_string(kWantCtrlTag)));
    ARROW_ASSIGN_OR_RAISE(data_location_,
                          flight::Location::Parse(location_.ToString() + "?want_data=" +
                                                  std::to_string(kWantDataTag)));
    return arrow::Status::OK();
  }