in cpp/code/datasets.cc [97:177]
arrow::Status DatasetRead(const std::string& airquality_basedir) {
StartRecipe("ListPartitionedDataset");
const std::string& directory_base = airquality_basedir;
// Create a filesystem
std::shared_ptr<arrow::fs::LocalFileSystem> fs =
std::make_shared<arrow::fs::LocalFileSystem>();
// Create a file selector which describes which files are part of
// the dataset. This selector performs a recursive search of a base
// directory which is typical with partitioned datasets. You can also
// create a dataset from a list of one or more paths.
arrow::fs::FileSelector selector;
selector.base_dir = directory_base;
selector.recursive = true;
// List out the files so we can see how our data is partitioned.
// This step is not necessary for reading a dataset
ARROW_ASSIGN_OR_RAISE(std::vector<arrow::fs::FileInfo> file_infos,
fs->GetFileInfo(selector));
int num_printed = 0;
for (const auto& path : file_infos) {
if (path.IsFile()) {
rout << path.path().substr(directory_base.size()) << std::endl;
if (++num_printed == 10) {
rout << "..." << std::endl;
break;
}
}
}
EndRecipe("ListPartitionedDataset");
StartRecipe("CreatingADataset");
// Create a file format which describes the format of the files.
// Here we specify we are reading parquet files. We could pick a different format
// such as Arrow-IPC files or CSV files or we could customize the parquet format with
// additional reading & parsing options.
std::shared_ptr<arrow::dataset::ParquetFileFormat> format =
std::make_shared<arrow::dataset::ParquetFileFormat>();
// Create a partitioning factory. A partitioning factory will be used by a dataset
// factory to infer the partitioning schema from the filenames. All we need to
// specify is the flavor of partitioning which, in our case, is "hive".
//
// Alternatively, we could manually create a partitioning scheme from a schema. This
// is typically not necessary for hive partitioning as inference works well.
std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory =
arrow::dataset::HivePartitioning::MakeFactory();
arrow::dataset::FileSystemFactoryOptions options;
options.partitioning = partitioning_factory;
// Create a dataset factory
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::dataset::DatasetFactory> dataset_factory,
arrow::dataset::FileSystemDatasetFactory::Make(fs, selector, format, options));
// Create the dataset, this will scan the dataset directory to find all the files
// and may scan some file metadata in order to determine the dataset schema.
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset,
dataset_factory->Finish());
rout << "We discovered the following schema for the dataset:" << std::endl
<< std::endl
<< dataset->schema()->ToString() << std::endl;
EndRecipe("CreatingADataset");
StartRecipe("ScanningADataset");
// Create a scanner
arrow::dataset::ScannerBuilder scanner_builder(dataset);
ARROW_RETURN_NOT_OK(scanner_builder.UseThreads(true));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Scanner> scanner,
scanner_builder.Finish());
// Scan the dataset. There are a variety of other methods available on the scanner as
// well
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, scanner->ToTable());
rout << "Read in a table with " << table->num_rows() << " rows and "
<< table->num_columns() << " columns";
EndRecipe("ScanningADataset");
return arrow::Status::OK();
}