in cpp/code/flight.cc [191:308]
arrow::Status TestPutGetDelete() {
StartRecipe("ParquetStorageService::StartServer");
auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);
arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));
arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
new ParquetStorageService(std::move(root)));
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
EndRecipe("ParquetStorageService::StartServer");
StartRecipe("ParquetStorageService::Connect");
arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));
std::unique_ptr<arrow::flight::FlightClient> client;
ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location));
rout << "Connected to " << location.ToString() << std::endl;
EndRecipe("ParquetStorageService::Connect");
StartRecipe("ParquetStorageService::DoPut");
// Open example data file to upload
ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
FindTestDataFile("airquality.parquet"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
fs->OpenInputFile(airquality_path));
ARROW_ASSIGN_OR_RAISE(auto reader, parquet::arrow::OpenFile(
std::move(input), arrow::default_memory_pool()));
auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
std::shared_ptr<arrow::Schema> schema;
ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
// Start the RPC call
std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
ARROW_ASSIGN_OR_RAISE(auto put_stream, client->DoPut(descriptor, schema));
writer = std::move(put_stream.writer);
metadata_reader = std::move(put_stream.reader);
// Upload data
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::vector<int> row_groups(reader->num_row_groups());
std::iota(row_groups.begin(), row_groups.end(), 0);
ARROW_RETURN_NOT_OK(reader->GetRecordBatchReader(row_groups, &batch_reader));
int64_t batches = 0;
while (true) {
ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
if (!batch) break;
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
batches++;
}
ARROW_RETURN_NOT_OK(writer->Close());
rout << "Wrote " << batches << " batches" << std::endl;
EndRecipe("ParquetStorageService::DoPut");
StartRecipe("ParquetStorageService::GetFlightInfo");
std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, client->GetFlightInfo(descriptor));
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
EndRecipe("ParquetStorageService::GetFlightInfo");
StartRecipe("ParquetStorageService::DoGet");
std::unique_ptr<arrow::flight::FlightStreamReader> stream;
ARROW_ASSIGN_OR_RAISE(stream, client->DoGet(flight_info->endpoints()[0].ticket));
std::shared_ptr<arrow::Table> table;
ARROW_ASSIGN_OR_RAISE(table, stream->ToTable());
arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
ARROW_RETURN_NOT_OK(arrow::PrettyPrint(*table, print_options, &rout));
EndRecipe("ParquetStorageService::DoGet");
StartRecipe("ParquetStorageService::DoAction");
arrow::flight::Action action{"drop_dataset",
arrow::Buffer::FromString("airquality.parquet")};
std::unique_ptr<arrow::flight::ResultStream> results;
ARROW_ASSIGN_OR_RAISE(results, client->DoAction(action));
rout << "Deleted dataset" << std::endl;
EndRecipe("ParquetStorageService::DoAction");
StartRecipe("ParquetStorageService::ListFlights");
std::unique_ptr<arrow::flight::FlightListing> listing;
ARROW_ASSIGN_OR_RAISE(listing, client->ListFlights());
while (true) {
std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, listing->Next());
if (!flight_info) break;
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
}
rout << "End of listing" << std::endl;
EndRecipe("ParquetStorageService::ListFlights");
StartRecipe("ParquetStorageService::StopServer");
ARROW_RETURN_NOT_OK(server->Shutdown());
rout << "Server shut down successfully" << std::endl;
EndRecipe("ParquetStorageService::StopServer");
return arrow::Status::OK();
}