cpp/velox/benchmarks/GenericBenchmark.cc (264 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <chrono> #include <thread> #include <arrow/c/bridge.h> #include <arrow/util/range.h> #include <benchmark/benchmark.h> #include <gflags/gflags.h> #include <operators/writer/ArrowWriter.h> #include "benchmarks/common/BenchmarkUtils.h" #include "benchmarks/common/FileReaderIterator.h" #include "compute/VeloxPlanConverter.h" #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" #include "shuffle/LocalPartitionWriter.h" #include "shuffle/VeloxShuffleWriter.h" #include "utils/VeloxArrowUtils.h" #include "utils/exception.h" #include "velox/exec/PlanNodeStats.h" using namespace gluten; namespace { DEFINE_bool(skip_input, false, "Skip specifying input files."); DEFINE_bool(with_shuffle, false, "Add shuffle split at end."); DEFINE_string(partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single"); DEFINE_bool(zstd, false, "Use ZSTD as shuffle compression codec"); DEFINE_bool(qat_gzip, false, "Use QAT GZIP as shuffle compression codec"); DEFINE_bool(qat_zstd, false, "Use QAT ZSTD as shuffle compression codec"); DEFINE_bool(iaa_gzip, false, "Use IAA GZIP as shuffle compression codec"); DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions"); struct WriterMetrics { int64_t splitTime; int64_t evictTime; int64_t writeTime; int64_t compressTime; }; std::shared_ptr<VeloxShuffleWriter> createShuffleWriter( VeloxMemoryManager* memoryManager, const std::string& dataFile, const std::vector<std::string>& localDirs) { PartitionWriterOptions partitionWriterOptions{}; if (FLAGS_zstd) { partitionWriterOptions.codecBackend = CodecBackend::NONE; partitionWriterOptions.compressionType = arrow::Compression::ZSTD; } else if (FLAGS_qat_gzip) { partitionWriterOptions.codecBackend = CodecBackend::QAT; partitionWriterOptions.compressionType = arrow::Compression::GZIP; } else if (FLAGS_qat_zstd) { partitionWriterOptions.codecBackend = CodecBackend::QAT; partitionWriterOptions.compressionType = arrow::Compression::ZSTD; } else if (FLAGS_iaa_gzip) { partitionWriterOptions.codecBackend = CodecBackend::IAA; partitionWriterOptions.compressionType = arrow::Compression::GZIP; } std::unique_ptr<PartitionWriter> partitionWriter = std::make_unique<LocalPartitionWriter>( FLAGS_shuffle_partitions, std::move(partitionWriterOptions), memoryManager->getArrowMemoryPool(), dataFile, localDirs); auto options = ShuffleWriterOptions{}; options.partitioning = gluten::toPartitioning(FLAGS_partitioning); GLUTEN_ASSIGN_OR_THROW( auto shuffleWriter, VeloxShuffleWriter::create( FLAGS_shuffle_partitions, std::move(partitionWriter), std::move(options), memoryManager->getLeafMemoryPool(), memoryManager->getArrowMemoryPool())); return shuffleWriter; } void populateWriterMetrics( const std::shared_ptr<VeloxShuffleWriter>& shuffleWriter, int64_t shuffleWriteTime, WriterMetrics& metrics) { metrics.compressTime += shuffleWriter->totalCompressTime(); metrics.evictTime += shuffleWriter->totalEvictTime(); metrics.writeTime += shuffleWriter->totalWriteTime(); metrics.evictTime += (shuffleWriteTime - shuffleWriter->totalCompressTime() - shuffleWriter->totalEvictTime() - shuffleWriter->totalWriteTime()); } } // namespace auto BM_Generic = [](::benchmark::State& state, const std::string& planFile, const std::string& splitFile, const std::vector<std::string>& inputFiles, const std::unordered_map<std::string, std::string>& conf, FileReaderType readerType) { // Pin each threads to different CPU# starting from 0 or --cpu. if (FLAGS_cpu != -1) { setCpu(FLAGS_cpu + state.thread_index()); } else { setCpu(state.thread_index()); } bool emptySplit = splitFile.empty(); memory::MemoryManager::testingSetInstance({}); auto memoryManager = getDefaultMemoryManager(); auto runtime = Runtime::create(kVeloxRuntimeKind, conf); auto plan = getPlanFromFile("Plan", planFile); std::string split; if (!emptySplit) { split = getPlanFromFile("ReadRel.LocalFiles", splitFile); } auto startTime = std::chrono::steady_clock::now(); int64_t collectBatchTime = 0; WriterMetrics writerMetrics{}; for (auto _ : state) { std::vector<std::shared_ptr<gluten::ResultIterator>> inputIters; std::vector<FileReaderIterator*> inputItersRaw; if (!inputFiles.empty()) { for (const auto& input : inputFiles) { inputIters.push_back(getInputIteratorFromFileReader(input, readerType)); } std::transform( inputIters.begin(), inputIters.end(), std::back_inserter(inputItersRaw), [](std::shared_ptr<gluten::ResultIterator> iter) { return static_cast<FileReaderIterator*>(iter->getInputIter()); }); } runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(), {}); if (!emptySplit) { runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()), split.size()); } auto resultIter = runtime->createResultIterator(memoryManager.get(), "/tmp/test-spill", std::move(inputIters), conf); auto veloxPlan = dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan(); if (FLAGS_with_shuffle) { int64_t shuffleWriteTime; TIME_NANO_START(shuffleWriteTime); std::string dataFile; std::vector<std::string> localDirs; bool isFromEnv; GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv)); const auto& shuffleWriter = createShuffleWriter(memoryManager.get(), dataFile, localDirs); while (resultIter->hasNext()) { GLUTEN_THROW_NOT_OK(shuffleWriter->split(resultIter->next(), ShuffleWriter::kMinMemLimit)); } GLUTEN_THROW_NOT_OK(shuffleWriter->stop()); TIME_NANO_END(shuffleWriteTime); populateWriterMetrics(shuffleWriter, shuffleWriteTime, writerMetrics); // Cleanup shuffle outputs cleanupShuffleOutput(dataFile, localDirs, isFromEnv); } else { // May write the output into file. ArrowSchema cSchema; toArrowSchema(veloxPlan->outputType(), memoryManager->getLeafMemoryPool().get(), &cSchema); GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&cSchema)); ArrowWriter writer{FLAGS_write_file}; state.PauseTiming(); if (!FLAGS_write_file.empty()) { GLUTEN_THROW_NOT_OK(writer.initWriter(*(outputSchema.get()))); } state.ResumeTiming(); while (resultIter->hasNext()) { auto array = resultIter->next()->exportArrowArray(); state.PauseTiming(); auto maybeBatch = arrow::ImportRecordBatch(array.get(), outputSchema); if (!maybeBatch.ok()) { state.SkipWithError(maybeBatch.status().message().c_str()); return; } if (FLAGS_print_result) { LOG(INFO) << maybeBatch.ValueOrDie()->ToString(); } } state.PauseTiming(); if (!FLAGS_write_file.empty()) { GLUTEN_THROW_NOT_OK(writer.closeWriter()); } state.ResumeTiming(); } collectBatchTime += std::accumulate(inputItersRaw.begin(), inputItersRaw.end(), 0, [](int64_t sum, FileReaderIterator* iter) { return sum + iter->getCollectBatchTime(); }); auto* rawIter = static_cast<gluten::WholeStageResultIterator*>(resultIter->getInputIter()); const auto* task = rawIter->task(); const auto* planNode = rawIter->veloxPlan(); auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true); LOG(INFO) << statsStr; } Runtime::release(runtime); auto endTime = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - startTime).count(); state.counters["collect_batch_time"] = benchmark::Counter(collectBatchTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); state.counters["elapsed_time"] = benchmark::Counter(duration, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); state.counters["shuffle_write_time"] = benchmark::Counter( writerMetrics.writeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); state.counters["shuffle_spill_time"] = benchmark::Counter( writerMetrics.evictTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); state.counters["shuffle_split_time"] = benchmark::Counter( writerMetrics.splitTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); state.counters["shuffle_compress_time"] = benchmark::Counter( writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); }; int main(int argc, char** argv) { ::benchmark::Initialize(&argc, argv); gflags::ParseCommandLineFlags(&argc, &argv, true); std::string substraitJsonFile; std::string splitFile; std::vector<std::string> inputFiles; std::unordered_map<std::string, std::string> conf; conf.insert({gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)}); conf.insert({kDebugModeEnabled, "true"}); initVeloxBackend(conf); try { if (argc < 2) { LOG(INFO) << "No input args. Usage: " << std::endl << "./generic_benchmark /absolute-path/to/substrait_json_file /absolute-path/to/split_json_file(optional)" << " /absolute-path/to/data_file_1 /absolute-path/to/data_file_2 ..."; LOG(INFO) << "Running example..."; inputFiles.resize(2); substraitJsonFile = getGeneratedFilePath("example.json"); inputFiles[0] = getGeneratedFilePath("example_orders"); inputFiles[1] = getGeneratedFilePath("example_lineitem"); } else { substraitJsonFile = argv[1]; splitFile = argv[2]; abortIfFileNotExists(substraitJsonFile); LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile; LOG(INFO) << "Using " << argc - 2 << " input data file(s): "; for (auto i = 3; i < argc; ++i) { inputFiles.emplace_back(argv[i]); abortIfFileNotExists(inputFiles.back()); LOG(INFO) << inputFiles.back(); } } } catch (const std::exception& e) { LOG(INFO) << "Failed to run benchmark: " << e.what(); ::benchmark::Shutdown(); std::exit(EXIT_FAILURE); } #define GENERIC_BENCHMARK(NAME, READER_TYPE) \ do { \ auto* bm = \ ::benchmark::RegisterBenchmark(NAME, BM_Generic, substraitJsonFile, splitFile, inputFiles, conf, READER_TYPE) \ ->MeasureProcessCPUTime() \ ->UseRealTime(); \ if (FLAGS_threads > 0) { \ bm->Threads(FLAGS_threads); \ } else { \ bm->ThreadRange(1, std::thread::hardware_concurrency()); \ } \ if (FLAGS_iterations > 0) { \ bm->Iterations(FLAGS_iterations); \ } \ } while (0) DLOG(INFO) << "FLAGS_threads:" << FLAGS_threads; DLOG(INFO) << "FLAGS_iterations:" << FLAGS_iterations; DLOG(INFO) << "FLAGS_cpu:" << FLAGS_cpu; DLOG(INFO) << "FLAGS_print_result:" << FLAGS_print_result; DLOG(INFO) << "FLAGS_write_file:" << FLAGS_write_file; DLOG(INFO) << "FLAGS_batch_size:" << FLAGS_batch_size; if (FLAGS_skip_input) { GENERIC_BENCHMARK("SkipInput", FileReaderType::kNone); } else { GENERIC_BENCHMARK("InputFromBatchVector", FileReaderType::kBuffered); GENERIC_BENCHMARK("InputFromBatchStream", FileReaderType::kStream); } ::benchmark::RunSpecifiedBenchmarks(); ::benchmark::Shutdown(); return 0; }