cpp/velox/benchmarks/ParquetWriteBenchmark.cc (109 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <benchmark/benchmark.h> #include "benchmarks/common/BenchmarkUtils.h" #include "compute/Runtime.h" #include "compute/VeloxBackend.h" #include "memory/VeloxMemoryManager.h" #include "operators/reader/ParquetReaderIterator.h" #include "operators/writer/VeloxParquetDataSource.h" #include "utils/VeloxArrowUtils.h" namespace gluten { const int kBatchBufferSize = 32768; class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark { public: GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark(const std::string& fileName, const std::string& outputPath) : fileName_(fileName), outputPath_(outputPath) {} void operator()(benchmark::State& state) { if (state.range(0) == 0xffffffff) { setCpu(state.thread_index()); } else { setCpu(state.range(0)); } std::shared_ptr<arrow::RecordBatch> recordBatch; int64_t elapseRead = 0; int64_t numBatches = 0; int64_t numRows = 0; int64_t numColumns = 0; int64_t initTime = 0; int64_t writeTime = 0; // reuse the ParquetWriteConverter for batches caused system % increase a lot auto memoryManager = getDefaultMemoryManager(); auto runtime = Runtime::create(kVeloxBackendKind, memoryManager); auto veloxPool = memoryManager->getAggregateMemoryPool(); for (auto _ : state) { const auto output = "velox_parquet_write.parquet"; // Init VeloxParquetDataSource auto reader = [&] { ScopedTimer timer(&elapseRead); return std::make_unique<ParquetBufferedReaderIterator>(fileName_, kBatchBufferSize, veloxPool); }(); const auto localSchema = toArrowSchema(reader->getRowType(), veloxPool.get()); auto veloxParquetDataSource = std::make_unique<gluten::VeloxParquetDataSource>( outputPath_ + "/" + output, veloxPool->addAggregateChild("writer_benchmark"), veloxPool->addLeafChild("sink_pool"), localSchema); veloxParquetDataSource->init(runtime->getConfMap()); while (auto batch = reader->next()) { ScopedTimer timer(&elapseRead); veloxParquetDataSource->write(batch); } veloxParquetDataSource->close(); } state.counters["columns"] = benchmark::Counter(numColumns, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["batches"] = benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["num_rows"] = benchmark::Counter(numRows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["batch_buffer_size"] = benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); state.counters["parquet_parse"] = benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["init_time"] = benchmark::Counter(initTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["write_time"] = benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); Runtime::release(runtime); } private: std::string fileName_; std::string outputPath_; }; } // namespace gluten // GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark usage // ./parquet_write_benchmark --threads=1 --file /mnt/DP_disk1/int.parquet --output file:/tmp/parquet-write // GoogleBenchmarkArrowParquetWriteCacheScanBenchmark usage // ./parquet_write_benchmark --threads=1 --file /mnt/DP_disk1/int.parquet --output /tmp/parquet-write int main(int argc, char** argv) { gluten::initVeloxBackend(); uint32_t iterations = 1; uint32_t threads = 1; std::string datafile; uint32_t cpu = 0xffffffff; std::string output; for (int i = 0; i < argc; i++) { if (strcmp(argv[i], "--iterations") == 0) { iterations = atol(argv[i + 1]); } else if (strcmp(argv[i], "--threads") == 0) { threads = atol(argv[i + 1]); } else if (strcmp(argv[i], "--file") == 0) { datafile = argv[i + 1]; } else if (strcmp(argv[i], "--cpu") == 0) { cpu = atol(argv[i + 1]); } else if (strcmp(argv[i], "--output") == 0) { output = (argv[i + 1]); } } LOG(INFO) << "iterations = " << iterations; LOG(INFO) << "threads = " << threads; LOG(INFO) << "datafile = " << datafile; LOG(INFO) << "cpu = " << cpu; LOG(INFO) << "output = " << output; gluten::GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark bck(datafile, output); benchmark::RegisterBenchmark("GoogleBenchmarkParquetWrite::CacheScan", bck) ->Args({ cpu, }) ->Iterations(iterations) ->Threads(threads) ->ReportAggregatesOnly(false) ->MeasureProcessCPUTime() ->Unit(benchmark::kSecond); benchmark::Initialize(&argc, argv); benchmark::RunSpecifiedBenchmarks(); benchmark::Shutdown(); }