cpp/velox/benchmarks/GenericBenchmark.cc (639 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <thread>
#include <arrow/c/bridge.h>
#include <arrow/util/range.h>
#include <benchmark/benchmark.h>
#include <gflags/gflags.h>
#include "benchmarks/common/BenchmarkUtils.h"
#include "compute/VeloxBackend.h"
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"
#include "operators/reader/FileReaderIterator.h"
#include "operators/writer/VeloxColumnarBatchWriter.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/Exception.h"
#include "utils/LocalRssClient.h"
#include "utils/StringUtil.h"
#include "utils/TestAllocationListener.h"
#include "utils/Timer.h"
#include "utils/VeloxArrowUtils.h"
#include "velox/exec/PlanNodeStats.h"
using namespace gluten;
namespace {
DEFINE_bool(print_result, true, "Print result for execution");
DEFINE_string(save_output, "", "Path to parquet file for saving the task output iterator");
DEFINE_bool(with_shuffle, false, "Add shuffle split at end.");
DEFINE_bool(run_shuffle, false, "Only run shuffle write.");
DEFINE_bool(run_shuffle_read, false, "Whether to run shuffle read when run_shuffle is true.");
DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or sort");
DEFINE_string(
partitioning,
"rr",
"Short partitioning name. Valid options are rr, hash, range, single, random (only for test purpose)");
DEFINE_bool(rss, false, "Mocking rss.");
DEFINE_string(
compression,
"lz4",
"Specify the compression codec. Valid options are lz4, zstd, qat_gzip, qat_zstd, iaa_gzip");
DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions");
DEFINE_string(plan, "", "Path to input json file of the substrait plan.");
DEFINE_string(
split,
"",
"Path to input json file of the splits. Only valid for simulating the first stage. Use comma-separated list for multiple splits.");
DEFINE_string(data, "", "Path to input data files in parquet format. Use comma-separated list for multiple files.");
DEFINE_string(conf, "", "Path to the configuration file.");
DEFINE_string(write_path, "/tmp", "Path to save the output from write tasks.");
DEFINE_int64(memory_limit, std::numeric_limits<int64_t>::max(), "Memory limit used to trigger spill.");
DEFINE_string(
scan_mode,
"stream",
"Scan mode for reading parquet data."
"'stream' mode: Input file scan happens inside of the pipeline."
"'buffered' mode: First read all data into memory and feed the pipeline with it.");
DEFINE_bool(debug_mode, false, "Whether to enable debug mode. Same as setting `spark.gluten.sql.debug`");
DEFINE_bool(query_trace_enabled, false, "Whether to enable query trace.");
DEFINE_string(query_trace_dir, "", "Base dir of a query to store tracing data.");
DEFINE_string(
query_trace_node_ids,
"",
"A comma-separated list of plan node ids whose input data will be traced. Empty string if only want to trace the query metadata.");
DEFINE_int64(query_trace_max_bytes, 0, "The max trace bytes limit. Tracing is disabled if zero.");
DEFINE_string(
query_trace_task_reg_exp,
"",
"The regexp of traced task id. We only enable trace on a task if its id matches.");
struct WriterMetrics {
int64_t splitTime{0};
int64_t evictTime{0};
int64_t writeTime{0};
int64_t compressTime{0};
int64_t dataSize{0};
int64_t bytesSpilled{0};
int64_t bytesWritten{0};
};
struct ReaderMetrics {
int64_t decompressTime{0};
int64_t deserializeTime{0};
};
void setUpBenchmark(::benchmark::internal::Benchmark* bm) {
if (FLAGS_threads > 0) {
bm->Threads(FLAGS_threads);
} else {
bm->ThreadRange(1, std::thread::hardware_concurrency());
}
if (FLAGS_iterations > 0) {
bm->Iterations(FLAGS_iterations);
}
}
std::string generateUniqueSubdir(const std::string& parent, const std::string& prefix = "") {
auto path = std::filesystem::path(parent) / (prefix + generateUuid());
std::error_code ec{};
while (!std::filesystem::create_directories(path, ec)) {
if (ec) {
LOG(ERROR) << fmt::format("Failed to created spill directory: {}, error code: {}", path, ec.message());
std::exit(EXIT_FAILURE);
}
path = std::filesystem::path(parent) / (prefix + generateUuid());
}
return path;
}
std::vector<std::string> createLocalDirs() {
static const std::string kBenchmarkDirPrefix = "generic-benchmark-";
std::vector<std::string> localDirs;
auto joinedDirsC = std::getenv(gluten::kGlutenSparkLocalDirs.c_str());
// Check if local dirs are set from env.
if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) {
auto joinedDirs = std::string(joinedDirsC);
auto dirs = gluten::splitPaths(joinedDirs);
for (const auto& dir : dirs) {
localDirs.push_back(generateUniqueSubdir(dir, kBenchmarkDirPrefix));
}
} else {
// Otherwise create 1 temp dir.
localDirs.push_back(generateUniqueSubdir(std::filesystem::temp_directory_path(), kBenchmarkDirPrefix));
}
return localDirs;
}
void cleanupLocalDirs(const std::vector<std::string>& localDirs) {
for (const auto& localDir : localDirs) {
std::error_code ec;
std::filesystem::remove_all(localDir, ec);
if (ec) {
LOG(WARNING) << fmt::format("Failed to remove directory: {}, error message: {}", localDir, ec.message());
} else {
LOG(INFO) << "Removed local dir: " << localDir;
}
}
}
PartitionWriterOptions createPartitionWriterOptions() {
PartitionWriterOptions partitionWriterOptions{};
// Disable writer's merge.
partitionWriterOptions.mergeThreshold = 0;
// Configure compression.
if (FLAGS_compression == "lz4") {
partitionWriterOptions.codecBackend = CodecBackend::NONE;
partitionWriterOptions.compressionType = arrow::Compression::LZ4_FRAME;
} else if (FLAGS_compression == "zstd") {
partitionWriterOptions.codecBackend = CodecBackend::NONE;
partitionWriterOptions.compressionType = arrow::Compression::ZSTD;
} else if (FLAGS_compression == "qat_gzip") {
partitionWriterOptions.codecBackend = CodecBackend::QAT;
partitionWriterOptions.compressionType = arrow::Compression::GZIP;
} else if (FLAGS_compression == "qat_zstd") {
partitionWriterOptions.codecBackend = CodecBackend::QAT;
partitionWriterOptions.compressionType = arrow::Compression::ZSTD;
} else if (FLAGS_compression == "iaa_gzip") {
partitionWriterOptions.codecBackend = CodecBackend::IAA;
partitionWriterOptions.compressionType = arrow::Compression::GZIP;
}
return partitionWriterOptions;
}
std::unique_ptr<PartitionWriter> createPartitionWriter(
Runtime* runtime,
PartitionWriterOptions options,
const std::string& dataFile,
const std::vector<std::string>& localDirs) {
std::unique_ptr<PartitionWriter> partitionWriter;
if (FLAGS_rss) {
auto rssClient = std::make_unique<LocalRssClient>(dataFile);
partitionWriter = std::make_unique<RssPartitionWriter>(
FLAGS_shuffle_partitions,
std::move(options),
runtime->memoryManager()->getArrowMemoryPool(),
std::move(rssClient));
} else {
partitionWriter = std::make_unique<LocalPartitionWriter>(
FLAGS_shuffle_partitions,
std::move(options),
runtime->memoryManager()->getArrowMemoryPool(),
dataFile,
localDirs);
}
return partitionWriter;
}
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
Runtime* runtime,
std::unique_ptr<PartitionWriter> partitionWriter) {
auto options = ShuffleWriterOptions{};
options.partitioning = gluten::toPartitioning(FLAGS_partitioning);
if (FLAGS_rss || FLAGS_shuffle_writer == "rss_sort") {
options.shuffleWriterType = gluten::ShuffleWriterType::kRssSortShuffle;
} else if (FLAGS_shuffle_writer == "sort") {
options.shuffleWriterType = gluten::ShuffleWriterType::kSortShuffle;
}
auto shuffleWriter =
runtime->createShuffleWriter(FLAGS_shuffle_partitions, std::move(partitionWriter), std::move(options));
return std::reinterpret_pointer_cast<VeloxShuffleWriter>(shuffleWriter);
}
void populateWriterMetrics(
const std::shared_ptr<VeloxShuffleWriter>& shuffleWriter,
int64_t totalTime,
WriterMetrics& metrics) {
metrics.compressTime += shuffleWriter->totalCompressTime();
metrics.evictTime += shuffleWriter->totalEvictTime();
metrics.writeTime += shuffleWriter->totalWriteTime();
auto splitTime = totalTime - metrics.compressTime - metrics.evictTime - metrics.writeTime;
if (splitTime > 0) {
metrics.splitTime += splitTime;
}
metrics.dataSize +=
std::accumulate(shuffleWriter->rawPartitionLengths().begin(), shuffleWriter->rawPartitionLengths().end(), 0LL);
metrics.bytesWritten += shuffleWriter->totalBytesWritten();
metrics.bytesSpilled += shuffleWriter->totalBytesEvicted();
}
void setCpu(::benchmark::State& state) {
// Pin each threads to different CPU# starting from 0 or --cpu.
auto cpu = state.thread_index();
if (FLAGS_cpu != -1) {
cpu += FLAGS_cpu;
}
LOG(WARNING) << "Setting CPU for thread " << state.thread_index() << " to " << cpu;
gluten::setCpu(cpu);
}
void runShuffle(
Runtime* runtime,
TestAllocationListener* listener,
const std::shared_ptr<gluten::ResultIterator>& resultIter,
WriterMetrics& writerMetrics,
ReaderMetrics& readerMetrics,
bool readAfterWrite,
const std::vector<std::string>& localDirs,
const std::string& dataFileDir) {
GLUTEN_ASSIGN_OR_THROW(auto dataFile, gluten::createTempShuffleFile(dataFileDir));
auto partitionWriterOptions = createPartitionWriterOptions();
auto partitionWriter = createPartitionWriter(runtime, partitionWriterOptions, dataFile, localDirs);
auto shuffleWriter = createShuffleWriter(runtime, std::move(partitionWriter));
listener->setShuffleWriter(shuffleWriter.get());
int64_t totalTime = 0;
std::shared_ptr<ArrowSchema> cSchema;
{
gluten::ScopedTimer timer(&totalTime);
while (resultIter->hasNext()) {
auto cb = resultIter->next();
if (!cSchema) {
cSchema = cb->exportArrowSchema();
}
GLUTEN_THROW_NOT_OK(shuffleWriter->write(cb, ShuffleWriter::kMaxMemLimit - shuffleWriter->cachedPayloadSize()));
}
GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
}
populateWriterMetrics(shuffleWriter, totalTime, writerMetrics);
if (readAfterWrite && cSchema) {
auto readerOptions = ShuffleReaderOptions{};
readerOptions.shuffleWriterType = shuffleWriter->options().shuffleWriterType;
readerOptions.compressionType = partitionWriterOptions.compressionType;
readerOptions.codecBackend = partitionWriterOptions.codecBackend;
std::shared_ptr<arrow::Schema> schema =
gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct ArrowSchema*>(cSchema.get())));
auto reader = runtime->createShuffleReader(schema, readerOptions);
GLUTEN_ASSIGN_OR_THROW(auto in, arrow::io::ReadableFile::Open(dataFile));
// Read all partitions.
auto iter = reader->readStream(in);
while (iter->hasNext()) {
// Read and discard.
auto cb = iter->next();
}
// Call the dtor to collect the metrics.
iter.reset();
readerMetrics.decompressTime = reader->getDecompressTime();
readerMetrics.deserializeTime = reader->getDeserializeTime();
}
if (std::filesystem::remove(dataFile)) {
LOG(INFO) << "Removed shuffle data file: " << dataFile;
} else {
LOG(WARNING) << "Failed to remove shuffle data file. File does not exist: " << dataFile;
}
}
void updateBenchmarkMetrics(
::benchmark::State& state,
const int64_t& elapsedTime,
const int64_t& readInputTime,
const WriterMetrics& writerMetrics,
const ReaderMetrics& readerMetrics) {
state.counters["read_input_time"] =
benchmark::Counter(readInputTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000);
state.counters["elapsed_time"] =
benchmark::Counter(elapsedTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000);
if (FLAGS_run_shuffle || FLAGS_with_shuffle) {
state.counters["shuffle_write_time"] = benchmark::Counter(
writerMetrics.writeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000);
state.counters["shuffle_spill_time"] = benchmark::Counter(
writerMetrics.evictTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000);
state.counters["shuffle_compress_time"] = benchmark::Counter(
writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000);
state.counters["shuffle_decompress_time"] = benchmark::Counter(
readerMetrics.decompressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000);
state.counters["shuffle_deserialize_time"] = benchmark::Counter(
readerMetrics.deserializeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000);
auto splitTime = writerMetrics.splitTime;
if (FLAGS_scan_mode == "stream") {
splitTime -= readInputTime;
}
state.counters["shuffle_split_time"] =
benchmark::Counter(splitTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000);
state.counters["shuffle_data_size"] = benchmark::Counter(
writerMetrics.dataSize, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024);
state.counters["shuffle_spilled_bytes"] = benchmark::Counter(
writerMetrics.bytesSpilled, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024);
state.counters["shuffle_write_bytes"] = benchmark::Counter(
writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024);
}
}
void setQueryTraceConfig(std::unordered_map<std::string, std::string>& configs) {
if (!FLAGS_query_trace_enabled) {
return;
}
configs[kQueryTraceEnabled] = "true";
if (FLAGS_query_trace_dir != "") {
configs[kQueryTraceDir] = FLAGS_query_trace_dir;
}
if (FLAGS_query_trace_max_bytes) {
configs[kQueryTraceMaxBytes] = std::to_string(FLAGS_query_trace_max_bytes);
}
if (FLAGS_query_trace_node_ids != "") {
configs[kQueryTraceNodeIds] = FLAGS_query_trace_node_ids;
}
if (FLAGS_query_trace_task_reg_exp != "") {
configs[kQueryTraceTaskRegExp] = FLAGS_query_trace_task_reg_exp;
}
}
} // namespace
using RuntimeFactory = std::function<VeloxRuntime*(MemoryManager* memoryManager)>;
auto BM_Generic = [](::benchmark::State& state,
const std::string& planFile,
const std::vector<std::string>& splitFiles,
const std::vector<std::string>& dataFiles,
const std::vector<std::string>& localDirs,
RuntimeFactory runtimeFactory,
FileReaderType readerType) {
setCpu(state);
auto listener = std::make_unique<TestAllocationListener>();
listener->updateLimit(FLAGS_memory_limit);
auto* listenerPtr = listener.get();
auto* memoryManager = MemoryManager::create(kVeloxBackendKind, std::move(listener));
auto runtime = runtimeFactory(memoryManager);
auto plan = getPlanFromFile("Plan", planFile);
std::vector<std::string> splits{};
for (const auto& splitFile : splitFiles) {
splits.push_back(getPlanFromFile("ReadRel.LocalFiles", splitFile));
}
const auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id());
const auto spillDirIndex = tid % localDirs.size();
const auto veloxSpillDir = generateUniqueSubdir(std::filesystem::path(localDirs[spillDirIndex]) / "gluten-spill");
std::vector<std::string> shuffleSpillDirs;
std::transform(localDirs.begin(), localDirs.end(), std::back_inserter(shuffleSpillDirs), [](const auto& dir) {
auto path = std::filesystem::path(dir) / "shuffle-write";
return path;
});
// Use a different directory for data file.
const auto dataFileDir = gluten::getShuffleSpillDir(
shuffleSpillDirs[(spillDirIndex + 1) % localDirs.size()], state.thread_index() % gluten::kDefaultNumSubDirs);
WriterMetrics writerMetrics{};
ReaderMetrics readerMetrics{};
int64_t readInputTime = 0;
int64_t elapsedTime = 0;
{
ScopedTimer timer(&elapsedTime);
for (auto _ : state) {
std::vector<std::shared_ptr<gluten::ResultIterator>> inputIters;
std::vector<FileReaderIterator*> inputItersRaw;
if (!dataFiles.empty()) {
for (const auto& input : dataFiles) {
inputIters.push_back(FileReaderIterator::getInputIteratorFromFileReader(
readerType, input, FLAGS_batch_size, runtime->memoryManager()->getLeafMemoryPool()));
}
std::transform(
inputIters.begin(),
inputIters.end(),
std::back_inserter(inputItersRaw),
[](std::shared_ptr<gluten::ResultIterator> iter) {
return static_cast<FileReaderIterator*>(iter->getInputIter());
});
}
*Runtime::localWriteFilesTempPath() = FLAGS_write_path;
runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size());
for (auto i = 0; i < splits.size(); i++) {
auto split = splits[i];
runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()), split.size(), i);
}
auto resultIter = runtime->createResultIterator(veloxSpillDir, std::move(inputIters), runtime->getConfMap());
listenerPtr->setIterator(resultIter.get());
if (FLAGS_with_shuffle) {
runShuffle(
runtime, listenerPtr, resultIter, writerMetrics, readerMetrics, false, shuffleSpillDirs, dataFileDir);
} else {
// May write the output into file.
std::shared_ptr<VeloxColumnarBatchWriter> writer{nullptr};
while (resultIter->hasNext()) {
auto cb = resultIter->next();
state.PauseTiming();
if (!FLAGS_save_output.empty()) {
if (writer == nullptr) {
writer = std::make_shared<VeloxColumnarBatchWriter>(
FLAGS_save_output, FLAGS_batch_size, runtime->memoryManager()->getAggregateMemoryPool());
}
GLUTEN_THROW_NOT_OK(writer->write(cb));
}
if (FLAGS_print_result) {
auto rowVector =
VeloxColumnarBatch::from(runtime->memoryManager()->getLeafMemoryPool().get(), cb)->getRowVector();
LOG(WARNING) << rowVector->toString(0, 20);
}
state.ResumeTiming();
}
state.PauseTiming();
if (!FLAGS_save_output.empty()) {
GLUTEN_THROW_NOT_OK(writer->close());
}
state.ResumeTiming();
}
readInputTime +=
std::accumulate(inputItersRaw.begin(), inputItersRaw.end(), 0, [](int64_t sum, FileReaderIterator* iter) {
return sum + iter->getCollectBatchTime();
});
auto* rawIter = static_cast<gluten::WholeStageResultIterator*>(resultIter->getInputIter());
const auto* task = rawIter->task();
LOG(WARNING) << task->toString();
const auto* planNode = rawIter->veloxPlan();
auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true);
LOG(WARNING) << statsStr;
}
}
updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics);
Runtime::release(runtime);
MemoryManager::release(memoryManager);
};
auto BM_ShuffleWriteRead = [](::benchmark::State& state,
const std::string& inputFile,
const std::vector<std::string>& localDirs,
RuntimeFactory runtimeFactory,
FileReaderType readerType) {
setCpu(state);
auto listener = std::make_unique<TestAllocationListener>();
listener->updateLimit(FLAGS_memory_limit);
auto* listenerPtr = listener.get();
auto* memoryManager = MemoryManager::create(kVeloxBackendKind, std::move(listener));
auto runtime = runtimeFactory(memoryManager);
const size_t dirIndex = std::hash<std::thread::id>{}(std::this_thread::get_id()) % localDirs.size();
const auto dataFileDir =
gluten::getShuffleSpillDir(localDirs[dirIndex], state.thread_index() % gluten::kDefaultNumSubDirs);
WriterMetrics writerMetrics{};
ReaderMetrics readerMetrics{};
int64_t readInputTime = 0;
int64_t elapsedTime = 0;
{
ScopedTimer timer(&elapsedTime);
for (auto _ : state) {
auto resultIter = FileReaderIterator::getInputIteratorFromFileReader(
readerType, inputFile, FLAGS_batch_size, runtime->memoryManager()->getLeafMemoryPool());
runShuffle(
runtime,
listenerPtr,
resultIter,
writerMetrics,
readerMetrics,
FLAGS_run_shuffle_read,
localDirs,
dataFileDir);
auto reader = static_cast<FileReaderIterator*>(resultIter->getInputIter());
readInputTime += reader->getCollectBatchTime();
}
}
updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics);
Runtime::release(runtime);
MemoryManager::release(memoryManager);
};
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
std::ostringstream ss;
ss << "Setting flags from command line args: " << std::endl;
std::vector<google::CommandLineFlagInfo> flags;
google::GetAllFlags(&flags);
auto filename = std::filesystem::path(__FILE__).filename();
for (const auto& flag : flags) {
if (std::filesystem::path(flag.filename).filename() == filename) {
ss << " FLAGS_" << flag.name << ": default = " << flag.default_value << ", current = " << flag.current_value
<< std::endl;
}
}
LOG(WARNING) << ss.str();
::benchmark::Initialize(&argc, argv);
// Init Velox backend.
std::unordered_map<std::string, std::string> backendConf{};
std::unordered_map<std::string, std::string> sessionConf{};
backendConf.insert({gluten::kDebugModeEnabled, std::to_string(FLAGS_debug_mode)});
backendConf.insert({gluten::kGlogVerboseLevel, std::to_string(FLAGS_v)});
backendConf.insert({gluten::kGlogSeverityLevel, std::to_string(FLAGS_minloglevel)});
if (!FLAGS_conf.empty()) {
abortIfFileNotExists(FLAGS_conf);
std::ifstream file(FLAGS_conf);
if (!file.is_open()) {
LOG(ERROR) << "Unable to open configuration file.";
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
// Parse the ini file.
// Load all key-values under [Backend Conf] to backendConf, under [Session Conf] to sessionConf.
// If no [Session Conf] section specified, all key-values are loaded for both backendConf and sessionConf.
bool isBackendConf = true;
std::string line;
while (std::getline(file, line)) {
if (line.empty() || line[0] == ';') {
continue;
}
if (line[0] == '[') {
if (line == "[Backend Conf]") {
isBackendConf = true;
} else if (line == "[Session Conf]") {
isBackendConf = false;
} else {
LOG(ERROR) << "Invalid section: " << line;
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
continue;
}
std::istringstream iss(line);
std::string key, value;
iss >> key;
// std::ws is used to consume any leading whitespace.
std::getline(iss >> std::ws, value);
if (isBackendConf) {
backendConf[key] = value;
} else {
sessionConf[key] = value;
}
}
}
if (sessionConf.empty()) {
sessionConf = backendConf;
}
setQueryTraceConfig(sessionConf);
setQueryTraceConfig(backendConf);
initVeloxBackend(backendConf);
memory::MemoryManager::testingSetInstance({});
// Parse substrait plan, split file and data files.
std::string substraitJsonFile = FLAGS_plan;
std::vector<std::string> splitFiles{};
std::vector<std::string> dataFiles{};
if (FLAGS_run_shuffle) {
std::string errorMsg{};
if (FLAGS_data.empty()) {
errorMsg = "Missing '--split' or '--data' option.";
} else if (FLAGS_partitioning != "rr" && FLAGS_partitioning != "random") {
errorMsg = "--run-shuffle only support round-robin partitioning and random partitioning.";
}
if (errorMsg.empty()) {
try {
dataFiles = gluten::splitPaths(FLAGS_data, true);
if (dataFiles.size() > 1) {
errorMsg = "Only one data file is allowed for shuffle write.";
}
} catch (const std::exception& e) {
errorMsg = e.what();
}
}
if (!errorMsg.empty()) {
LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl;
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
} else {
// Validate input args.
std::string errorMsg{};
if (substraitJsonFile.empty()) {
errorMsg = "Missing '--plan' option.";
} else if (!checkPathExists(substraitJsonFile)) {
errorMsg = "File path does not exist: " + substraitJsonFile;
} else if (FLAGS_split.empty() && FLAGS_data.empty()) {
errorMsg = "Missing '--split' or '--data' option.";
}
if (errorMsg.empty()) {
try {
if (!FLAGS_data.empty()) {
dataFiles = gluten::splitPaths(FLAGS_data, true);
}
if (!FLAGS_split.empty()) {
splitFiles = gluten::splitPaths(FLAGS_split, true);
}
} catch (const std::exception& e) {
errorMsg = e.what();
}
}
if (!errorMsg.empty()) {
LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl
<< "*** Please check docs/developers/MicroBenchmarks.md for the full usage. ***";
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
}
LOG(WARNING) << "Using substrait json file: " << std::endl << substraitJsonFile;
if (!splitFiles.empty()) {
LOG(WARNING) << "Using " << splitFiles.size() << " input split file(s): ";
for (const auto& splitFile : splitFiles) {
LOG(WARNING) << splitFile;
}
}
if (!dataFiles.empty()) {
LOG(WARNING) << "Using " << dataFiles.size() << " input data file(s): ";
for (const auto& dataFile : dataFiles) {
LOG(WARNING) << dataFile;
}
}
RuntimeFactory runtimeFactory = [=](MemoryManager* memoryManager) {
return dynamic_cast<VeloxRuntime*>(Runtime::create(kVeloxBackendKind, memoryManager, sessionConf));
};
const auto localDirs = createLocalDirs();
#define GENERIC_BENCHMARK(READER_TYPE) \
do { \
auto* bm = ::benchmark::RegisterBenchmark( \
"GenericBenchmark", \
BM_Generic, \
substraitJsonFile, \
splitFiles, \
dataFiles, \
localDirs, \
runtimeFactory, \
READER_TYPE) \
->MeasureProcessCPUTime() \
->UseRealTime(); \
setUpBenchmark(bm); \
} while (0)
#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE) \
do { \
auto* bm = ::benchmark::RegisterBenchmark( \
"ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0], localDirs, runtimeFactory, READER_TYPE) \
->MeasureProcessCPUTime() \
->UseRealTime(); \
setUpBenchmark(bm); \
} while (0)
if (dataFiles.empty()) {
GENERIC_BENCHMARK(FileReaderType::kNone);
} else {
FileReaderType readerType;
if (FLAGS_scan_mode == "buffered") {
readerType = FileReaderType::kBuffered;
LOG(WARNING) << "Using buffered mode for reading parquet data.";
} else {
readerType = FileReaderType::kStream;
LOG(WARNING) << "Using stream mode for reading parquet data.";
}
if (FLAGS_run_shuffle) {
SHUFFLE_WRITE_READ_BENCHMARK(readerType);
} else {
GENERIC_BENCHMARK(readerType);
}
}
::benchmark::RunSpecifiedBenchmarks();
::benchmark::Shutdown();
gluten::VeloxBackend::get()->tearDown();
cleanupLocalDirs(localDirs);
return 0;
}