in bench/wdtGenFiles.cpp [324:415]
int main(int argc, char **argv) {
FLAGS_logtostderr = true;
// gflags api is nicely inconsistent here
GFLAGS_NAMESPACE::SetArgv(argc, const_cast<const char **>(argv));
GFLAGS_NAMESPACE::SetVersionString(WDT_VERSION_STR);
string usage("Generates test files for wdt transfer benchmark. v");
usage.append(GFLAGS_NAMESPACE::VersionString());
usage.append(". Sample usage:\n\t");
usage.append(GFLAGS_NAMESPACE::ProgramInvocationShortName());
usage.append(" [flags] < Bigrams > generated");
GFLAGS_NAMESPACE::SetUsageMessage(usage);
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
std::vector<PairBigramCount> statsData;
if (FLAGS_stats_source == "-") {
LOG(INFO) << "Reading stdin for Bigram data... (produced by wdt_gen_stats)";
deserialize(statsData, std::cin);
} else {
LOG(INFO) << "Reading " << FLAGS_stats_source << " for Bigram data... "
<< "(produced by wdt_gen_stats)";
std::ifstream statsin(FLAGS_stats_source);
if (!statsin.good()) {
PLOG(FATAL) << "Unable to read bigrams from " << FLAGS_stats_source;
}
deserialize(statsData, statsin);
}
LOG(INFO) << "Found " << statsData.size() << " entries.";
LOG(INFO) << "Will generate in directory=" << FLAGS_directory;
if (chdir(FLAGS_directory.c_str())) {
PLOG(FATAL) << "Error changing directory to " << FLAGS_directory;
}
const size_t totalTargetSz = FLAGS_gen_size_mb * 1024 * 1024;
const size_t numThreads = FLAGS_num_threads;
const size_t targetSzPerThread = totalTargetSz / numThreads;
if (targetSzPerThread < 1) {
LOG(FATAL) << "Invalid gen_size_mb and num_threads combo " << totalTargetSz;
}
const size_t totalSz = targetSzPerThread * numThreads;
if (FLAGS_gen_block_size < 1) {
LOG(FATAL) << "Invalid gen_block_size " << FLAGS_gen_block_size;
}
const size_t blockSz =
std::min(targetSzPerThread, (size_t)FLAGS_gen_block_size);
LOG(INFO) << "Requested " << totalSz << " (" << targetSzPerThread
<< " data * " << numThreads << " thread), " << blockSz
<< " at a time";
LOG(INFO) << "Writting to " << FLAGS_filename;
const int fd = open(FLAGS_filename.c_str(), O_CREAT | O_WRONLY, 0644);
if (fd < 0) {
PLOG(FATAL) << "Unable to open " << FLAGS_filename;
}
std::vector<std::thread> threads;
threads.reserve(numThreads);
SentenceGen sg(statsData);
for (int i = numThreads; i--;) {
threads.push_back(std::thread([targetSzPerThread, blockSz, fd, i, &sg] {
std::shared_ptr<RndEngine> rndEngine = createRandomGenerator(i);
size_t targetSz = targetSzPerThread;
size_t toWrite = blockSz;
off_t offset = i * targetSzPerThread;
string res;
res.reserve(toWrite);
Bigram previous = sg.generateInitial(*rndEngine, res, toWrite);
while (1) {
CHECK_EQ(toWrite, res.size());
ssize_t w = pwrite(fd, res.data(), res.size(), offset);
if (w != static_cast<ssize_t>(res.size())) {
PLOG(FATAL) << "Expected to write " << res.size() << " got " << w;
}
res.clear();
targetSz -= toWrite;
offset += toWrite;
if (targetSz == 0) {
break;
}
if (toWrite > targetSz) {
toWrite = targetSz;
}
sg.generate(*rndEngine, res, toWrite, previous);
}
}));
}
for (int i = numThreads; i--;) {
threads[i].join();
}
// TODO: calculate and print throughput
return 0;
}