in cpp/velox/benchmarks/GenericBenchmark.cc [552:761]
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
std::ostringstream ss;
ss << "Setting flags from command line args: " << std::endl;
std::vector<google::CommandLineFlagInfo> flags;
google::GetAllFlags(&flags);
auto filename = std::filesystem::path(__FILE__).filename();
for (const auto& flag : flags) {
if (std::filesystem::path(flag.filename).filename() == filename) {
ss << " FLAGS_" << flag.name << ": default = " << flag.default_value << ", current = " << flag.current_value
<< std::endl;
}
}
LOG(WARNING) << ss.str();
::benchmark::Initialize(&argc, argv);
// Init Velox backend.
std::unordered_map<std::string, std::string> backendConf{};
std::unordered_map<std::string, std::string> sessionConf{};
backendConf.insert({gluten::kDebugModeEnabled, std::to_string(FLAGS_debug_mode)});
backendConf.insert({gluten::kGlogVerboseLevel, std::to_string(FLAGS_v)});
backendConf.insert({gluten::kGlogSeverityLevel, std::to_string(FLAGS_minloglevel)});
if (!FLAGS_conf.empty()) {
abortIfFileNotExists(FLAGS_conf);
std::ifstream file(FLAGS_conf);
if (!file.is_open()) {
LOG(ERROR) << "Unable to open configuration file.";
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
// Parse the ini file.
// Load all key-values under [Backend Conf] to backendConf, under [Session Conf] to sessionConf.
// If no [Session Conf] section specified, all key-values are loaded for both backendConf and sessionConf.
bool isBackendConf = true;
std::string line;
while (std::getline(file, line)) {
if (line.empty() || line[0] == ';') {
continue;
}
if (line[0] == '[') {
if (line == "[Backend Conf]") {
isBackendConf = true;
} else if (line == "[Session Conf]") {
isBackendConf = false;
} else {
LOG(ERROR) << "Invalid section: " << line;
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
continue;
}
std::istringstream iss(line);
std::string key, value;
iss >> key;
// std::ws is used to consume any leading whitespace.
std::getline(iss >> std::ws, value);
if (isBackendConf) {
backendConf[key] = value;
} else {
sessionConf[key] = value;
}
}
}
if (sessionConf.empty()) {
sessionConf = backendConf;
}
setQueryTraceConfig(sessionConf);
setQueryTraceConfig(backendConf);
initVeloxBackend(backendConf);
memory::MemoryManager::testingSetInstance({});
// Parse substrait plan, split file and data files.
std::string substraitJsonFile = FLAGS_plan;
std::vector<std::string> splitFiles{};
std::vector<std::string> dataFiles{};
if (FLAGS_run_shuffle) {
std::string errorMsg{};
if (FLAGS_data.empty()) {
errorMsg = "Missing '--split' or '--data' option.";
} else if (FLAGS_partitioning != "rr" && FLAGS_partitioning != "random") {
errorMsg = "--run-shuffle only support round-robin partitioning and random partitioning.";
}
if (errorMsg.empty()) {
try {
dataFiles = gluten::splitPaths(FLAGS_data, true);
if (dataFiles.size() > 1) {
errorMsg = "Only one data file is allowed for shuffle write.";
}
} catch (const std::exception& e) {
errorMsg = e.what();
}
}
if (!errorMsg.empty()) {
LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl;
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
} else {
// Validate input args.
std::string errorMsg{};
if (substraitJsonFile.empty()) {
errorMsg = "Missing '--plan' option.";
} else if (!checkPathExists(substraitJsonFile)) {
errorMsg = "File path does not exist: " + substraitJsonFile;
} else if (FLAGS_split.empty() && FLAGS_data.empty()) {
errorMsg = "Missing '--split' or '--data' option.";
}
if (errorMsg.empty()) {
try {
if (!FLAGS_data.empty()) {
dataFiles = gluten::splitPaths(FLAGS_data, true);
}
if (!FLAGS_split.empty()) {
splitFiles = gluten::splitPaths(FLAGS_split, true);
}
} catch (const std::exception& e) {
errorMsg = e.what();
}
}
if (!errorMsg.empty()) {
LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl
<< "*** Please check docs/developers/MicroBenchmarks.md for the full usage. ***";
::benchmark::Shutdown();
std::exit(EXIT_FAILURE);
}
}
LOG(WARNING) << "Using substrait json file: " << std::endl << substraitJsonFile;
if (!splitFiles.empty()) {
LOG(WARNING) << "Using " << splitFiles.size() << " input split file(s): ";
for (const auto& splitFile : splitFiles) {
LOG(WARNING) << splitFile;
}
}
if (!dataFiles.empty()) {
LOG(WARNING) << "Using " << dataFiles.size() << " input data file(s): ";
for (const auto& dataFile : dataFiles) {
LOG(WARNING) << dataFile;
}
}
RuntimeFactory runtimeFactory = [=](MemoryManager* memoryManager) {
return dynamic_cast<VeloxRuntime*>(Runtime::create(kVeloxBackendKind, memoryManager, sessionConf));
};
const auto localDirs = createLocalDirs();
#define GENERIC_BENCHMARK(READER_TYPE) \
do { \
auto* bm = ::benchmark::RegisterBenchmark( \
"GenericBenchmark", \
BM_Generic, \
substraitJsonFile, \
splitFiles, \
dataFiles, \
localDirs, \
runtimeFactory, \
READER_TYPE) \
->MeasureProcessCPUTime() \
->UseRealTime(); \
setUpBenchmark(bm); \
} while (0)
#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE) \
do { \
auto* bm = ::benchmark::RegisterBenchmark( \
"ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0], localDirs, runtimeFactory, READER_TYPE) \
->MeasureProcessCPUTime() \
->UseRealTime(); \
setUpBenchmark(bm); \
} while (0)
if (dataFiles.empty()) {
GENERIC_BENCHMARK(FileReaderType::kNone);
} else {
FileReaderType readerType;
if (FLAGS_scan_mode == "buffered") {
readerType = FileReaderType::kBuffered;
LOG(WARNING) << "Using buffered mode for reading parquet data.";
} else {
readerType = FileReaderType::kStream;
LOG(WARNING) << "Using stream mode for reading parquet data.";
}
if (FLAGS_run_shuffle) {
SHUFFLE_WRITE_READ_BENCHMARK(readerType);
} else {
GENERIC_BENCHMARK(readerType);
}
}
::benchmark::RunSpecifiedBenchmarks();
::benchmark::Shutdown();
gluten::VeloxBackend::get()->tearDown();
cleanupLocalDirs(localDirs);
return 0;
}