cpp/code/datasets.cc (117 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <arrow/api.h> #include <arrow/dataset/api.h> #include <arrow/filesystem/api.h> #include <gtest/gtest.h> #include <parquet/arrow/reader.h> #include <filesystem> #include <memory> #include "common.h" class DatasetReadingTest : public ::testing::Test { public: void SetUp() override { airquality_partitioned_dir_ = std::filesystem::temp_directory_path() / "cookbook_cpp_airquality"; std::shared_ptr<arrow::fs::FileSystem> fs = std::make_shared<arrow::fs::LocalFileSystem>(); ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Table> airquality, ReadInAirQuality(fs.get())); WritePartitionedAirQuality(airquality, std::move(fs)); } const std::string& airquality_basedir() { return airquality_partitioned_dir_; } private: void WritePartitionedAirQuality(const std::shared_ptr<arrow::Table>& airquality, std::shared_ptr<arrow::fs::FileSystem> fs) { std::shared_ptr<arrow::RecordBatchReader> table_reader = std::make_shared<arrow::TableBatchReader>(*airquality); std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(std::move(table_reader)); ASSERT_OK(scanner_builder->UseThreads(true)); ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Scanner> scanner, scanner_builder->Finish()); std::shared_ptr<arrow::Schema> partitioning_schema = arrow::schema( {arrow::field("Month", arrow::int32()), arrow::field("Day", arrow::int32())}); std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory = arrow::dataset::HivePartitioning::MakeFactory(); ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Partitioning> partitioning, partitioning_factory->Finish(partitioning_schema)); std::shared_ptr<arrow::dataset::ParquetFileFormat> parquet_format = std::make_shared<arrow::dataset::ParquetFileFormat>(); arrow::dataset::FileSystemDatasetWriteOptions write_options; write_options.existing_data_behavior = arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions; write_options.filesystem = std::move(fs); write_options.partitioning = std::move(partitioning); write_options.base_dir = airquality_partitioned_dir_; write_options.basename_template = "chunk-{i}.parquet"; write_options.file_write_options = parquet_format->DefaultWriteOptions(); ASSERT_OK( arrow::dataset::FileSystemDataset::Write(write_options, std::move(scanner))); } static arrow::Result<std::shared_ptr<arrow::Table>> ReadInAirQuality( arrow::fs::FileSystem* fs) { ARROW_ASSIGN_OR_RAISE(std::string airquality_path, FindTestDataFile("airquality.parquet")); ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> file, fs->OpenInputFile(airquality_path)); std::unique_ptr<parquet::ParquetFileReader> parquet_reader = parquet::ParquetFileReader::Open(file); std::unique_ptr<parquet::arrow::FileReader> reader; ARROW_RETURN_NOT_OK(parquet::arrow::FileReader::Make( arrow::default_memory_pool(), std::move(parquet_reader), &reader)); std::shared_ptr<arrow::Table> table; ARROW_RETURN_NOT_OK(reader->ReadTable(&table)); return table; } std::string airquality_partitioned_dir_; }; arrow::Status DatasetRead(const std::string& airquality_basedir) { StartRecipe("ListPartitionedDataset"); const std::string& directory_base = airquality_basedir; // Create a filesystem std::shared_ptr<arrow::fs::LocalFileSystem> fs = std::make_shared<arrow::fs::LocalFileSystem>(); // Create a file selector which describes which files are part of // the dataset. This selector performs a recursive search of a base // directory which is typical with partitioned datasets. You can also // create a dataset from a list of one or more paths. arrow::fs::FileSelector selector; selector.base_dir = directory_base; selector.recursive = true; // List out the files so we can see how our data is partitioned. // This step is not necessary for reading a dataset ARROW_ASSIGN_OR_RAISE(std::vector<arrow::fs::FileInfo> file_infos, fs->GetFileInfo(selector)); int num_printed = 0; for (const auto& path : file_infos) { if (path.IsFile()) { rout << path.path().substr(directory_base.size()) << std::endl; if (++num_printed == 10) { rout << "..." << std::endl; break; } } } EndRecipe("ListPartitionedDataset"); StartRecipe("CreatingADataset"); // Create a file format which describes the format of the files. // Here we specify we are reading parquet files. We could pick a different format // such as Arrow-IPC files or CSV files or we could customize the parquet format with // additional reading & parsing options. std::shared_ptr<arrow::dataset::ParquetFileFormat> format = std::make_shared<arrow::dataset::ParquetFileFormat>(); // Create a partitioning factory. A partitioning factory will be used by a dataset // factory to infer the partitioning schema from the filenames. All we need to // specify is the flavor of partitioning which, in our case, is "hive". // // Alternatively, we could manually create a partitioning scheme from a schema. This // is typically not necessary for hive partitioning as inference works well. std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory = arrow::dataset::HivePartitioning::MakeFactory(); arrow::dataset::FileSystemFactoryOptions options; options.partitioning = partitioning_factory; // Create a dataset factory ARROW_ASSIGN_OR_RAISE( std::shared_ptr<arrow::dataset::DatasetFactory> dataset_factory, arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, options)); // Create the dataset, this will scan the dataset directory to find all the files // and may scan some file metadata in order to determine the dataset schema. ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, dataset_factory->Finish()); rout << "We discovered the following schema for the dataset:" << std::endl << std::endl << dataset->schema()->ToString() << std::endl; EndRecipe("CreatingADataset"); StartRecipe("ScanningADataset"); // Create a scanner arrow::dataset::ScannerBuilder scanner_builder(dataset); ARROW_RETURN_NOT_OK(scanner_builder.UseThreads(true)); ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Scanner> scanner, scanner_builder.Finish()); // Scan the dataset. There are a variety of other methods available on the scanner as // well ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, scanner->ToTable()); rout << "Read in a table with " << table->num_rows() << " rows and " << table->num_columns() << " columns"; EndRecipe("ScanningADataset"); return arrow::Status::OK(); } TEST_F(DatasetReadingTest, TestDatasetRead) { ASSERT_OK(DatasetRead(airquality_basedir())); }