void frequent_items_merge_timing_profile::run()

in cpp/src/frequent_items_merge_timing_profile.cpp [33:113]


void frequent_items_merge_timing_profile::run() {
  const unsigned lg_min_stream_len = 0;
  const unsigned lg_max_stream_len = 23;
  const unsigned ppo = 16;

  const unsigned lg_max_trials = 14;
  const unsigned lg_min_trials = 4;

  const unsigned lg_max_sketch_size = 10;

  const unsigned zipf_lg_range = 13; // range: 8K values for 1K sketch
  const double zipf_exponent = 1.1;
  zipf_distribution zipf(1 << zipf_lg_range, zipf_exponent);
  const unsigned long long high_bit = 1ULL << 63;

  const size_t num_sketches = 32;

  size_t max_len = 1 << lg_max_stream_len;
  std::vector<std::string> values(max_len);
  std::unique_ptr<frequent_items_sketch<std::string>> sketches[num_sketches];

  std::cout << "StreamLen\tTrials\tBuild\tUpdate\tMerge\tMaxErr\tNumItems" << std::endl;

  size_t stream_length = 1 << lg_min_stream_len;
  while (stream_length <= 1 << lg_max_stream_len) {
    std::chrono::nanoseconds build_time_ns(0);
    std::chrono::nanoseconds update_time_ns(0);
    std::chrono::nanoseconds merge_time_ns(0);
    unsigned num_items = 0;
    unsigned max_error = 0;

    const size_t num_trials = get_num_trials(stream_length, lg_min_stream_len, lg_max_stream_len, lg_min_trials, lg_max_trials);

    for (size_t t = 0; t < num_trials; t++) {
      const auto start_build(std::chrono::high_resolution_clock::now());
      for (size_t i = 0; i < num_sketches; i++) {
        sketches[i] = std::unique_ptr<frequent_items_sketch<std::string>>(new frequent_items_sketch<std::string>(lg_max_sketch_size));
      }
      frequent_items_sketch<std::string> merge_sketch(lg_max_sketch_size);
      const auto finish_build(std::chrono::high_resolution_clock::now());
      build_time_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(finish_build - start_build);

      // prepare values to exclude cost from the update loop
      // set the highest bit in 64-bit value to make strings longer so we can compare copying and moving better
      for (size_t i = 0; i < stream_length; i++) values[i] = std::to_string(zipf.sample() | high_bit);

      // spray input evenly across all sketches to be merged
      const auto start_update(std::chrono::high_resolution_clock::now());
      size_t i = 0;
      for (size_t j = 0; j < stream_length; j++) {
        sketches[i]->update(values[j]);
        i++;
        if (i == num_sketches) i = 0;
      }
      const auto finish_update(std::chrono::high_resolution_clock::now());
      update_time_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(finish_update - start_update);

      const auto start_merge(std::chrono::high_resolution_clock::now());
      for (size_t i = 0; i < num_sketches; i++) {
        //merge_sketch.merge(*sketches[i]);
        merge_sketch.merge(std::move(*sketches[i]));
      }
      const auto finish_merge(std::chrono::high_resolution_clock::now());
      merge_time_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(finish_merge - start_merge);

      num_items += merge_sketch.get_num_active_items();
      max_error += merge_sketch.get_maximum_error();
    }

    std::cout << stream_length << "\t"
        << num_trials << "\t"
        << (double) build_time_ns.count() / num_trials << "\t"
        << (double) update_time_ns.count() / num_trials / stream_length << "\t"
        << (double) merge_time_ns.count() / num_trials << "\t"
        << (double) max_error / num_trials << "\t"
        << (double) num_items / num_trials
        << std::endl;

    stream_length = pwr_2_law_next(ppo, stream_length);
  }
}