int main()

in bench/wdtGenFiles.cpp [324:415]


int main(int argc, char **argv) {
  FLAGS_logtostderr = true;
  // gflags api is nicely inconsistent here
  GFLAGS_NAMESPACE::SetArgv(argc, const_cast<const char **>(argv));
  GFLAGS_NAMESPACE::SetVersionString(WDT_VERSION_STR);
  string usage("Generates test files for wdt transfer benchmark. v");
  usage.append(GFLAGS_NAMESPACE::VersionString());
  usage.append(". Sample usage:\n\t");
  usage.append(GFLAGS_NAMESPACE::ProgramInvocationShortName());
  usage.append(" [flags] < Bigrams > generated");
  GFLAGS_NAMESPACE::SetUsageMessage(usage);
  GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
  google::InitGoogleLogging(argv[0]);

  std::vector<PairBigramCount> statsData;
  if (FLAGS_stats_source == "-") {
    LOG(INFO) << "Reading stdin for Bigram data... (produced by wdt_gen_stats)";
    deserialize(statsData, std::cin);
  } else {
    LOG(INFO) << "Reading " << FLAGS_stats_source << " for Bigram data... "
              << "(produced by wdt_gen_stats)";
    std::ifstream statsin(FLAGS_stats_source);
    if (!statsin.good()) {
      PLOG(FATAL) << "Unable to read bigrams from " << FLAGS_stats_source;
    }
    deserialize(statsData, statsin);
  }
  LOG(INFO) << "Found " << statsData.size() << " entries.";

  LOG(INFO) << "Will generate in directory=" << FLAGS_directory;
  if (chdir(FLAGS_directory.c_str())) {
    PLOG(FATAL) << "Error changing directory to " << FLAGS_directory;
  }

  const size_t totalTargetSz = FLAGS_gen_size_mb * 1024 * 1024;
  const size_t numThreads = FLAGS_num_threads;
  const size_t targetSzPerThread = totalTargetSz / numThreads;
  if (targetSzPerThread < 1) {
    LOG(FATAL) << "Invalid gen_size_mb and num_threads combo " << totalTargetSz;
  }
  const size_t totalSz = targetSzPerThread * numThreads;
  if (FLAGS_gen_block_size < 1) {
    LOG(FATAL) << "Invalid gen_block_size " << FLAGS_gen_block_size;
  }
  const size_t blockSz =
      std::min(targetSzPerThread, (size_t)FLAGS_gen_block_size);
  LOG(INFO) << "Requested " << totalSz << " (" << targetSzPerThread
            << " data * " << numThreads << " thread), " << blockSz
            << " at a time";
  LOG(INFO) << "Writting to " << FLAGS_filename;
  const int fd = open(FLAGS_filename.c_str(), O_CREAT | O_WRONLY, 0644);
  if (fd < 0) {
    PLOG(FATAL) << "Unable to open " << FLAGS_filename;
  }
  std::vector<std::thread> threads;
  threads.reserve(numThreads);
  SentenceGen sg(statsData);
  for (int i = numThreads; i--;) {
    threads.push_back(std::thread([targetSzPerThread, blockSz, fd, i, &sg] {
      std::shared_ptr<RndEngine> rndEngine = createRandomGenerator(i);
      size_t targetSz = targetSzPerThread;
      size_t toWrite = blockSz;
      off_t offset = i * targetSzPerThread;
      string res;
      res.reserve(toWrite);

      Bigram previous = sg.generateInitial(*rndEngine, res, toWrite);
      while (1) {
        CHECK_EQ(toWrite, res.size());
        ssize_t w = pwrite(fd, res.data(), res.size(), offset);
        if (w != static_cast<ssize_t>(res.size())) {
          PLOG(FATAL) << "Expected to write " << res.size() << " got " << w;
        }
        res.clear();
        targetSz -= toWrite;
        offset += toWrite;
        if (targetSz == 0) {
          break;
        }
        if (toWrite > targetSz) {
          toWrite = targetSz;
        }
        sg.generate(*rndEngine, res, toWrite, previous);
      }
    }));
  }
  for (int i = numThreads; i--;) {
    threads[i].join();
  }
  // TODO: calculate and print throughput
  return 0;
}