cpp/velox/benchmarks/ParquetWriteBenchmark.cc (109 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <benchmark/benchmark.h>
#include "benchmarks/common/BenchmarkUtils.h"
#include "compute/Runtime.h"
#include "compute/VeloxBackend.h"
#include "memory/VeloxMemoryManager.h"
#include "operators/reader/ParquetReaderIterator.h"
#include "operators/writer/VeloxParquetDataSource.h"
#include "utils/VeloxArrowUtils.h"
namespace gluten {
const int kBatchBufferSize = 32768;
class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark {
public:
GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark(const std::string& fileName, const std::string& outputPath)
: fileName_(fileName), outputPath_(outputPath) {}
void operator()(benchmark::State& state) {
if (state.range(0) == 0xffffffff) {
setCpu(state.thread_index());
} else {
setCpu(state.range(0));
}
std::shared_ptr<arrow::RecordBatch> recordBatch;
int64_t elapseRead = 0;
int64_t numBatches = 0;
int64_t numRows = 0;
int64_t numColumns = 0;
int64_t initTime = 0;
int64_t writeTime = 0;
// reuse the ParquetWriteConverter for batches caused system % increase a lot
auto memoryManager = getDefaultMemoryManager();
auto runtime = Runtime::create(kVeloxBackendKind, memoryManager);
auto veloxPool = memoryManager->getAggregateMemoryPool();
for (auto _ : state) {
const auto output = "velox_parquet_write.parquet";
// Init VeloxParquetDataSource
auto reader = [&] {
ScopedTimer timer(&elapseRead);
return std::make_unique<ParquetBufferedReaderIterator>(fileName_, kBatchBufferSize, veloxPool);
}();
const auto localSchema = toArrowSchema(reader->getRowType(), veloxPool.get());
auto veloxParquetDataSource = std::make_unique<gluten::VeloxParquetDataSource>(
outputPath_ + "/" + output,
veloxPool->addAggregateChild("writer_benchmark"),
veloxPool->addLeafChild("sink_pool"),
localSchema);
veloxParquetDataSource->init(runtime->getConfMap());
while (auto batch = reader->next()) {
ScopedTimer timer(&elapseRead);
veloxParquetDataSource->write(batch);
}
veloxParquetDataSource->close();
}
state.counters["columns"] =
benchmark::Counter(numColumns, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
state.counters["batches"] =
benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
state.counters["num_rows"] =
benchmark::Counter(numRows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
state.counters["batch_buffer_size"] =
benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024);
state.counters["parquet_parse"] =
benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
state.counters["init_time"] =
benchmark::Counter(initTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
state.counters["write_time"] =
benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
Runtime::release(runtime);
}
private:
std::string fileName_;
std::string outputPath_;
};
} // namespace gluten
// GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark usage
// ./parquet_write_benchmark --threads=1 --file /mnt/DP_disk1/int.parquet --output file:/tmp/parquet-write
// GoogleBenchmarkArrowParquetWriteCacheScanBenchmark usage
// ./parquet_write_benchmark --threads=1 --file /mnt/DP_disk1/int.parquet --output /tmp/parquet-write
int main(int argc, char** argv) {
gluten::initVeloxBackend();
uint32_t iterations = 1;
uint32_t threads = 1;
std::string datafile;
uint32_t cpu = 0xffffffff;
std::string output;
for (int i = 0; i < argc; i++) {
if (strcmp(argv[i], "--iterations") == 0) {
iterations = atol(argv[i + 1]);
} else if (strcmp(argv[i], "--threads") == 0) {
threads = atol(argv[i + 1]);
} else if (strcmp(argv[i], "--file") == 0) {
datafile = argv[i + 1];
} else if (strcmp(argv[i], "--cpu") == 0) {
cpu = atol(argv[i + 1]);
} else if (strcmp(argv[i], "--output") == 0) {
output = (argv[i + 1]);
}
}
LOG(INFO) << "iterations = " << iterations;
LOG(INFO) << "threads = " << threads;
LOG(INFO) << "datafile = " << datafile;
LOG(INFO) << "cpu = " << cpu;
LOG(INFO) << "output = " << output;
gluten::GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark bck(datafile, output);
benchmark::RegisterBenchmark("GoogleBenchmarkParquetWrite::CacheScan", bck)
->Args({
cpu,
})
->Iterations(iterations)
->Threads(threads)
->ReportAggregatesOnly(false)
->MeasureProcessCPUTime()
->Unit(benchmark::kSecond);
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
benchmark::Shutdown();
}