bench/wdtGenFiles.cpp (346 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include <assert.h> #include <time.h> #include <array> #include <fstream> #include <future> #include <initializer_list> #include <iostream> #include <memory> #include <random> #include <vector> #include <unistd.h> #include <gflags/gflags.h> #include <glog/logging.h> #include <wdt/WdtConfig.h> #include "wdt/bench/Bigram.h" /* Future "interface"/"features": DEFINE_int32(dir_depth, 8, "Directory depth"); DEFINE_int32(files_per_dir, 8, "Files per directory"); DEFINE_int32(ascii_percent, 30, "Percentage of ASCII files"); DEFINE_double(total_size_gb, 10.0, "Total data set size in gigabytes"); */ /// For now: DEFINE_double(gen_size_mb, 10, "Size of data to generate in MBytes"); DEFINE_int32(gen_block_size, 16384, "Size of blocks to generate (in bytes)"); DEFINE_int32(num_threads, 16, "Number of threads"); DEFINE_bool(seed_with_time, false, "New (time based) seed for randomizer"); DEFINE_string(stats_source, "-", "Where to read bigram stats from, default is - for stdin"); DEFINE_string(start, "", "Start the chain at start - empty/default = random bigram"); DEFINE_string(reset_char, "", "In case of dead-end, try to start at that character, or rnd"); DEFINE_string(directory, ".", "Directory in which to generate data"); DEFINE_string(filename, "gen.data", "Base filename to use for generated data"); template <typename T> class ProbabilityTable; // forward for operator below // needs to be before the friend below template <typename T> std::ostream &operator<<(std::ostream &os, const ProbabilityTable<T> &t) { uint32_t sz = t.size(); os << "probT " << sz << ": "; if (!sz) { os << " <empty>" << std::endl; return os; } T prev = t.data_[0]; int count = 1; for (uint32_t i = 1; i < t.size(); ++i) { Bigram cur = t.data_[i]; if (cur == prev) { ++count; continue; } // else os << prev << " x " << count << ", "; count = 1; prev = cur; } os << prev << " x " << count << std::endl; return os; } // typedef std::knuth_b RndEngine; // typedef std::mt19937 RndEngine; // typedef std::ranlux48 RndEngine; // This seems to be the fastest one: typedef std::minstd_rand RndEngine; static std::shared_ptr<RndEngine> createRandomGenerator(int offset) { std::shared_ptr<RndEngine> gen = std::make_shared<RndEngine>(); time_t t = offset; if (FLAGS_seed_with_time) { t += time(nullptr); // Should technically be LOG(INFO) so if something fails the test can be // reproduced with the same seed - but it's verbose with lots of threads // and tests VLOG(1) << "Time Initializing RndEngine with " << t; } else { VLOG(1) << "Thread Index Initializing RndEngine with " << t; } gen->seed(t); return gen; } // Only ok to copy T multiple time inside if sizeof(T) !>> log2(input.size())*8 // TODO consider a tree or inlining the first few entries (ifs/range test equiv) template <typename T> class ProbabilityTable { public: // Constant based/source version: ProbabilityTable(std::initializer_list<std::pair<T, uint32_t>> input) { init(input); } explicit ProbabilityTable(std::vector<std::pair<T, uint32_t>> input) { init(input); } void init(std::vector<std::pair<T, uint32_t>> input) { uint32_t sum = 0; for (auto const &p : input) { auto const &b = p.first; VLOG(1) << "PT " << b << " weight " << p.second; sum += p.second; } LOG(INFO) << "Creating probability distribution for " << input.size() << " entries, total weight " << sum; data_.reserve(sum); // TODO: auto scale (divide by min/3 for instance?) dist_.reset(new std::uniform_int_distribution<int>(0, sum - 1)); size_ = sum; for (auto const &p : input) { auto const &b = p.first; auto const &v = p.second; VLOG(1) << "PT " << b << " scaled weight " << v; for (uint32_t n = v; n--;) { data_.push_back(b); } } assert(data_.size() == sum); } // Incremental version ProbabilityTable() : size_(0) { } void append(const T &value, uint32_t count) { for (; count--;) { data_.push_back(value); } } void seal(const ProbabilityTable & /*parent*/) { size_ = data_.size(); if (size_ > 0) { dist_.reset(new std::uniform_int_distribution<int>(0, size_ - 1)); } } uint32_t size() const { return size_; } const T &operator[](uint32_t idx) const { return data_[idx]; } const T &random(RndEngine &gen) { if (!size_) { static const T EMPTY; LOG(ERROR) << "Called random on 0 sized PT " << this; return EMPTY; } const int rIdx = (*dist_)(gen); VLOG(2) << "Random index " << rIdx << " -> " << data_[rIdx]; return data_[rIdx]; } private: uint32_t size_; std::vector<T> data_; std::unique_ptr<std::uniform_int_distribution<int>> dist_; template <typename A> friend std::ostream &operator<<(std::ostream &os, const ProbabilityTable<A> &t); }; typedef ProbabilityTable<Bigram> PTB; class SentenceGen { public: // For compiled in/static version: SentenceGen(std::initializer_list<PairBigramCount> input) : ptb0_(input) { init(input); } explicit SentenceGen(std::vector<PairBigramCount> input) : ptb0_(input) { init(input); } void init(std::vector<PairBigramCount> input) { if (!FLAGS_reset_char.empty()) { reset_char_ = FLAGS_reset_char[0]; } for (auto const &p : input) { const Bigram &b = p.first; const char first = b[0]; const int idx = char2index(first); ptb1_[idx].append(b, p.second); } // set the distribution/count and share the random gen for (int c = 0; c <= 255; ++c) { const int idx = char2index(c); ptb1_[idx].seal(ptb0_); if (ptb1_[idx].size() <= 0) { continue; } if (std::isprint(c)) { VLOG(1) << "For " << c << " '" << (char)c << "': " << ptb1_[idx]; } else { VLOG(1) << "For " << c << " n/p: " << ptb1_[idx]; } } } const Bigram &initial(RndEngine &gen) { return ptb0_.random(gen); } // Sets result a random Bigram starting with c - or returns false bool next(RndEngine &gen, Bigram &result, char c) { const int idx = char2index(c); VLOG(3) << "looking up for '" << Bigram::toPrintableString(c) << "' (at position " << idx << ") :" << ptb1_[idx] << std::endl; if (!ptb1_[idx].size()) { VLOG(1) << "No successor for " << Bigram::toPrintableString(c) << std::endl; return false; } Bigram newB = ptb1_[idx].random(gen); VLOG(2) << "picked " << Bigram::toPrintableString(c) << " -> " << newB << std::endl; result = newB; return true; } // Implace next() logic, chain the bigrams if possible, return false otherwise bool next(RndEngine &gen, Bigram &previous) { return next(gen, previous, previous[1]); } // will generate exactly len bytes unless len is < FLAGS_start.size() Bigram generateInitial(RndEngine &gen, std::string &result, int32_t len) { Bigram previous; // Start with given string or start randomly: size_t startLen = FLAGS_start.size(); bool initDone = false; if (startLen) { result.append(FLAGS_start.substr(0, --startLen)); len -= startLen; const char lastCharOfFirst = FLAGS_start[startLen]; if (next(gen, previous, lastCharOfFirst)) { initDone = true; } else { result.push_back(lastCharOfFirst); --len; } } if (!initDone) { previous = initial(gen); } result.push_back(previous[0]); generate(gen, result, len - 1, previous); return previous; } void generate(RndEngine &gen, std::string &result, int32_t len, Bigram &previous) { VLOG(1) << "generate " << len << " " << previous; // main loop: while (len > 0) { bool found = next(gen, previous); if (!found) { result.push_back(previous[1]); --len; if (reset_char_) { if (next(gen, previous, reset_char_)) { continue; } } previous = initial(gen); } if (len <= 0) { return; } result.push_back(previous[0]); --len; } } static SentenceGen &getTestInstance() { static SentenceGen s_wg{ {{"la"}, 3}, {{"ah"}, 2}, {{" B"}, 1}, {{" b"}, 1}, {{" f"}, 1}, {{" l"}, 1}, {{". "}, 1}, {{"Bl"}, 1}, {{"Th"}, 1}, {{"az"}, 1}, {{"bl"}, 1}, {{"e "}, 1}, {{"fo"}, 1}, {{"h "}, 1}, {{"h."}, 1}, {{"he"}, 1}, {{"ox"}, 1}, {{"x."}, 1}, {{"y "}, 1}, {{"zy"}, 1}, {{".\012"}, 1}, }; return s_wg; } private: uint32_t char2index(const char c) { return (unsigned char)c; /* assert(c >= 'a' && c <= 'z'); return c - 'a'; */ } /// First level (new 'word') PTB ptb0_; /// Second level PTB ptb1_[256 /*1 + 'z' - 'a'*/]; // = 26 /// Reset character (0/NUL means no special reset char) char reset_char_{0}; }; void deserialize(std::vector<PairBigramCount> &statsData, std::istream &sin) { while (!sin.fail()) { Bigram b; if (!b.binaryDeserialize(sin)) { break; } uint32_t count = 0; sin.read(reinterpret_cast<char *>(&count), sizeof(count)); statsData.push_back(PairBigramCount(b, count)); } } using std::string; int main(int argc, char **argv) { FLAGS_logtostderr = true; // gflags api is nicely inconsistent here GFLAGS_NAMESPACE::SetArgv(argc, const_cast<const char **>(argv)); GFLAGS_NAMESPACE::SetVersionString(WDT_VERSION_STR); string usage("Generates test files for wdt transfer benchmark. v"); usage.append(GFLAGS_NAMESPACE::VersionString()); usage.append(". Sample usage:\n\t"); usage.append(GFLAGS_NAMESPACE::ProgramInvocationShortName()); usage.append(" [flags] < Bigrams > generated"); GFLAGS_NAMESPACE::SetUsageMessage(usage); GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); std::vector<PairBigramCount> statsData; if (FLAGS_stats_source == "-") { LOG(INFO) << "Reading stdin for Bigram data... (produced by wdt_gen_stats)"; deserialize(statsData, std::cin); } else { LOG(INFO) << "Reading " << FLAGS_stats_source << " for Bigram data... " << "(produced by wdt_gen_stats)"; std::ifstream statsin(FLAGS_stats_source); if (!statsin.good()) { PLOG(FATAL) << "Unable to read bigrams from " << FLAGS_stats_source; } deserialize(statsData, statsin); } LOG(INFO) << "Found " << statsData.size() << " entries."; LOG(INFO) << "Will generate in directory=" << FLAGS_directory; if (chdir(FLAGS_directory.c_str())) { PLOG(FATAL) << "Error changing directory to " << FLAGS_directory; } const size_t totalTargetSz = FLAGS_gen_size_mb * 1024 * 1024; const size_t numThreads = FLAGS_num_threads; const size_t targetSzPerThread = totalTargetSz / numThreads; if (targetSzPerThread < 1) { LOG(FATAL) << "Invalid gen_size_mb and num_threads combo " << totalTargetSz; } const size_t totalSz = targetSzPerThread * numThreads; if (FLAGS_gen_block_size < 1) { LOG(FATAL) << "Invalid gen_block_size " << FLAGS_gen_block_size; } const size_t blockSz = std::min(targetSzPerThread, (size_t)FLAGS_gen_block_size); LOG(INFO) << "Requested " << totalSz << " (" << targetSzPerThread << " data * " << numThreads << " thread), " << blockSz << " at a time"; LOG(INFO) << "Writting to " << FLAGS_filename; const int fd = open(FLAGS_filename.c_str(), O_CREAT | O_WRONLY, 0644); if (fd < 0) { PLOG(FATAL) << "Unable to open " << FLAGS_filename; } std::vector<std::thread> threads; threads.reserve(numThreads); SentenceGen sg(statsData); for (int i = numThreads; i--;) { threads.push_back(std::thread([targetSzPerThread, blockSz, fd, i, &sg] { std::shared_ptr<RndEngine> rndEngine = createRandomGenerator(i); size_t targetSz = targetSzPerThread; size_t toWrite = blockSz; off_t offset = i * targetSzPerThread; string res; res.reserve(toWrite); Bigram previous = sg.generateInitial(*rndEngine, res, toWrite); while (1) { CHECK_EQ(toWrite, res.size()); ssize_t w = pwrite(fd, res.data(), res.size(), offset); if (w != static_cast<ssize_t>(res.size())) { PLOG(FATAL) << "Expected to write " << res.size() << " got " << w; } res.clear(); targetSz -= toWrite; offset += toWrite; if (targetSz == 0) { break; } if (toWrite > targetSz) { toWrite = targetSz; } sg.generate(*rndEngine, res, toWrite, previous); } })); } for (int i = numThreads; i--;) { threads[i].join(); } // TODO: calculate and print throughput return 0; }