void WritePartitionedAirQuality()

in cpp/code/datasets.cc [44:76]


  void WritePartitionedAirQuality(const std::shared_ptr<arrow::Table>& airquality,
                                  std::shared_ptr<arrow::fs::FileSystem> fs) {
    std::shared_ptr<arrow::RecordBatchReader> table_reader =
        std::make_shared<arrow::TableBatchReader>(*airquality);

    std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
        arrow::dataset::ScannerBuilder::FromRecordBatchReader(std::move(table_reader));
    ASSERT_OK(scanner_builder->UseThreads(true));
    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Scanner> scanner,
                         scanner_builder->Finish());

    std::shared_ptr<arrow::Schema> partitioning_schema = arrow::schema(
        {arrow::field("Month", arrow::int32()), arrow::field("Day", arrow::int32())});
    std::shared_ptr<arrow::dataset::PartitioningFactory> partitioning_factory =
        arrow::dataset::HivePartitioning::MakeFactory();
    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::dataset::Partitioning> partitioning,
                         partitioning_factory->Finish(partitioning_schema));

    std::shared_ptr<arrow::dataset::ParquetFileFormat> parquet_format =
        std::make_shared<arrow::dataset::ParquetFileFormat>();

    arrow::dataset::FileSystemDatasetWriteOptions write_options;
    write_options.existing_data_behavior =
        arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions;
    write_options.filesystem = std::move(fs);
    write_options.partitioning = std::move(partitioning);
    write_options.base_dir = airquality_partitioned_dir_;
    write_options.basename_template = "chunk-{i}.parquet";
    write_options.file_write_options = parquet_format->DefaultWriteOptions();

    ASSERT_OK(
        arrow::dataset::FileSystemDataset::Write(write_options, std::move(scanner)));
  }