arrow::Status TestPutGetDelete()

in cpp/code/flight.cc [191:308]


arrow::Status TestPutGetDelete() {
  StartRecipe("ParquetStorageService::StartServer");
  auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
  ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
  ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
  auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);

  arrow::flight::Location server_location;
  ARROW_ASSIGN_OR_RAISE(server_location,
      arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));

  arrow::flight::FlightServerOptions options(server_location);
  auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
      new ParquetStorageService(std::move(root)));
  ARROW_RETURN_NOT_OK(server->Init(options));
  rout << "Listening on port " << server->port() << std::endl;
  EndRecipe("ParquetStorageService::StartServer");

  StartRecipe("ParquetStorageService::Connect");
  arrow::flight::Location location;
  ARROW_ASSIGN_OR_RAISE(location,
      arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

  std::unique_ptr<arrow::flight::FlightClient> client;
  ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location));
  rout << "Connected to " << location.ToString() << std::endl;
  EndRecipe("ParquetStorageService::Connect");

  StartRecipe("ParquetStorageService::DoPut");
  // Open example data file to upload
  ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
                        FindTestDataFile("airquality.parquet"));
  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
                        fs->OpenInputFile(airquality_path));
  ARROW_ASSIGN_OR_RAISE(auto reader, parquet::arrow::OpenFile(
                                         std::move(input), arrow::default_memory_pool()));

  auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
  std::shared_ptr<arrow::Schema> schema;
  ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));

  // Start the RPC call
  std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
  std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
  ARROW_ASSIGN_OR_RAISE(auto put_stream, client->DoPut(descriptor, schema));
  writer = std::move(put_stream.writer);
  metadata_reader = std::move(put_stream.reader);

  // Upload data
  std::shared_ptr<arrow::RecordBatchReader> batch_reader;
  std::vector<int> row_groups(reader->num_row_groups());
  std::iota(row_groups.begin(), row_groups.end(), 0);
  ARROW_RETURN_NOT_OK(reader->GetRecordBatchReader(row_groups, &batch_reader));
  int64_t batches = 0;
  while (true) {
    ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
    if (!batch) break;
    ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
    batches++;
  }

  ARROW_RETURN_NOT_OK(writer->Close());
  rout << "Wrote " << batches << " batches" << std::endl;
  EndRecipe("ParquetStorageService::DoPut");

  StartRecipe("ParquetStorageService::GetFlightInfo");
  std::unique_ptr<arrow::flight::FlightInfo> flight_info;
  ARROW_ASSIGN_OR_RAISE(flight_info, client->GetFlightInfo(descriptor));
  rout << flight_info->descriptor().ToString() << std::endl;
  rout << "=== Schema ===" << std::endl;
  std::shared_ptr<arrow::Schema> info_schema;
  arrow::ipc::DictionaryMemo dictionary_memo;
  ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
  rout << info_schema->ToString() << std::endl;
  rout << "==============" << std::endl;
  EndRecipe("ParquetStorageService::GetFlightInfo");

  StartRecipe("ParquetStorageService::DoGet");
  std::unique_ptr<arrow::flight::FlightStreamReader> stream;
  ARROW_ASSIGN_OR_RAISE(stream, client->DoGet(flight_info->endpoints()[0].ticket));
  std::shared_ptr<arrow::Table> table;
  ARROW_ASSIGN_OR_RAISE(table, stream->ToTable());
  arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
  ARROW_RETURN_NOT_OK(arrow::PrettyPrint(*table, print_options, &rout));
  EndRecipe("ParquetStorageService::DoGet");

  StartRecipe("ParquetStorageService::DoAction");
  arrow::flight::Action action{"drop_dataset",
                               arrow::Buffer::FromString("airquality.parquet")};
  std::unique_ptr<arrow::flight::ResultStream> results;
  ARROW_ASSIGN_OR_RAISE(results, client->DoAction(action));
  rout << "Deleted dataset" << std::endl;
  EndRecipe("ParquetStorageService::DoAction");

  StartRecipe("ParquetStorageService::ListFlights");
  std::unique_ptr<arrow::flight::FlightListing> listing;
  ARROW_ASSIGN_OR_RAISE(listing, client->ListFlights());
  while (true) {
    std::unique_ptr<arrow::flight::FlightInfo> flight_info;
    ARROW_ASSIGN_OR_RAISE(flight_info, listing->Next());
    if (!flight_info) break;
    rout << flight_info->descriptor().ToString() << std::endl;
    rout << "=== Schema ===" << std::endl;
    std::shared_ptr<arrow::Schema> info_schema;
    arrow::ipc::DictionaryMemo dictionary_memo;
    ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
    rout << info_schema->ToString() << std::endl;
    rout << "==============" << std::endl;
  }
  rout << "End of listing" << std::endl;
  EndRecipe("ParquetStorageService::ListFlights");

  StartRecipe("ParquetStorageService::StopServer");
  ARROW_RETURN_NOT_OK(server->Shutdown());
  rout << "Server shut down successfully" << std::endl;
  EndRecipe("ParquetStorageService::StopServer");
  return arrow::Status::OK();
}