void TestMerge()

in src/kudu/common/generic_iterators-test.cc [290:482]


void TestMerge(const Schema& schema,
               const TestIntRangePredicate &predicate,
               bool overlapping_ranges = true,
               bool include_deleted_rows = false) {
  struct List {
    explicit List(int num_rows)
        : sv(new SelectionVector(num_rows)) {
      ints.reserve(num_rows);
      is_deleted.reserve(num_rows);
    }

    vector<int64_t> ints;
    vector<uint8_t> is_deleted;
    unique_ptr<SelectionVector> sv;
    optional<pair<string, string>> encoded_bounds;
  };

  vector<List> all_ints;
  map<int64_t, bool> expected;
  unordered_set<int64_t> seen_live;
  const auto& encoder = GetKeyEncoder<string>(GetTypeInfo(INT64));
  Random prng(SeedRandom());

  int64_t entry = 0;
  for (int i = 0; i < FLAGS_num_lists; i++) {
    List list(FLAGS_num_rows);
    unordered_set<int64_t> seen_this_list;

    optional<int64_t> min_entry;
    optional<int64_t> max_entry;
    if (overlapping_ranges) {
      entry = 0;
    }
    for (int j = 0; j < FLAGS_num_rows; j++) {
      int64_t potential;
      bool is_deleted = false;
      // The merge iterator does not support duplicate non-deleted keys.
      while (true) {
        potential = entry + prng.Uniform(FLAGS_num_rows * FLAGS_num_lists * 10);
        // Only one live version of a row can exist across all lists.
        // Including several duplicate deleted keys is fine.
        if (ContainsKey(seen_live, potential)) continue;

        // No duplicate keys are allowed in the same list (same RowSet).
        if (ContainsKey(seen_this_list, potential)) continue;
        InsertOrDie(&seen_this_list, potential);

        // If we are including deleted rows, with some probability make this a
        // deleted row.
        if (include_deleted_rows) {
          is_deleted = prng.OneIn(4);
          if (is_deleted) {
            break;
          }
        }

        // This is a new live row. Un-mark it as deleted if necessary.
        if (!is_deleted) {
          InsertOrDie(&seen_live, potential);
        }
        break;
      }
      entry = potential;

      list.ints.emplace_back(entry);
      list.is_deleted.emplace_back(is_deleted);

      if (!max_entry || entry > max_entry) {
        max_entry = entry;
      }
      if (!min_entry || entry < min_entry) {
        min_entry = entry;
      }

      // Some entries are randomly deselected in order to exercise the selection
      // vector logic in the MergeIterator. This is reflected both in the input
      // to the MergeIterator as well as the expected output (see below).
      bool row_selected = prng.Uniform(8) > 0;
      VLOG(2) << Substitute("Row $0 with value $1 selected? $2",
                            j, entry, row_selected);
      if (row_selected) {
        list.sv->SetRowSelected(j);
      } else {
        list.sv->SetRowUnselected(j);
      }

      // Consider the predicate and the selection vector before inserting this
      // row into 'expected'.
      if (entry >= predicate.lower_ && entry < predicate.upper_ && row_selected) {
        auto result = expected.emplace(entry, is_deleted);
        if (!result.second) {
          // We should only be overwriting a deleted row.
          bool existing_is_deleted = result.first->second;
          CHECK_EQ(true, existing_is_deleted);
          result.first->second = is_deleted;
        }
      }
    }

    if (prng.Uniform(10) > 0) {
      // Use the smallest and largest entries as bounds most of the time. They
      // are randomly adjusted to reflect their inexactness in the real world.
      list.encoded_bounds.emplace();
      DCHECK(min_entry);
      DCHECK(max_entry);
      min_entry = *min_entry - prng.Uniform(5);
      max_entry = *max_entry + prng.Uniform(5);
      encoder.Encode(&(*min_entry), &list.encoded_bounds->first);
      encoder.Encode(&(*max_entry), &list.encoded_bounds->second);
    }
    all_ints.emplace_back(std::move(list));
  }

  LOG_TIMING(INFO, "shuffling the inputs") {
    std::random_device rdev;
    std::mt19937 gen(rdev());
    std::shuffle(all_ints.begin(), all_ints.end(), gen);
  }

  VLOG(1) << "Predicate expects " << expected.size() << " results: " << expected;

  for (int trial = 0; trial < FLAGS_num_iters; trial++) {
    vector<IterWithBounds> to_merge;
    for (const auto& list : all_ints) {
      unique_ptr<VectorIterator> vec_it(new VectorIterator(list.ints, list.is_deleted, schema));
      vec_it->set_block_size(16);
      vec_it->set_selection_vector(list.sv.get());
      unique_ptr<RowwiseIterator> mat_it(NewMaterializingIterator(std::move(vec_it)));
      IterWithBounds mat_iwb;
      mat_iwb.iter = std::move(mat_it);
      vector<IterWithBounds> un_input;
      un_input.emplace_back(std::move(mat_iwb));
      unique_ptr<RowwiseIterator> un_it(NewUnionIterator(std::move(un_input)));
      IterWithBounds un_iwb;
      un_iwb.iter = std::move(un_it);
      if (list.encoded_bounds) {
        un_iwb.encoded_bounds = list.encoded_bounds;
      }
      to_merge.emplace_back(std::move(un_iwb));
    }

    // Setup predicate exclusion
    ScanSpec spec;
    spec.AddPredicate(predicate.pred_);
    LOG(INFO) << "Predicate: " << predicate.pred_.ToString();

    LOG_TIMING(INFO, "iterating merged lists") {
      unique_ptr<RowwiseIterator> merger(NewMergeIterator(
          MergeIteratorOptions(include_deleted_rows), std::move(to_merge)));
      ASSERT_OK(merger->Init(&spec));

      // The RowBlock is sized to a power of 2 to improve BitmapCopy performance
      // when copying another RowBlock into it.
      RowBlockMemory mem;
      RowBlock dst(&schema, 128, &mem);
      size_t total_idx = 0;
      auto expected_iter = expected.cbegin();
      while (merger->HasNext()) {
        ASSERT_OK(merger->NextBlock(&dst));
        ASSERT_GT(dst.nrows(), 0) <<
          "if HasNext() returns true, must return some rows";

        for (int i = 0; i < dst.nrows(); i++) {
          if (!dst.selection_vector()->IsRowSelected(i)) {
            continue;
          }
          ASSERT_NE(expected.end(), expected_iter);
          int64_t expected_key = expected_iter->first;
          bool expected_is_deleted = expected_iter->second;
          int64_t row_key = *schema.ExtractColumnFromRow<INT64>(dst.row(i), kValColIdx);
          ASSERT_GE(row_key, predicate.lower_) << "Yielded integer excluded by predicate";
          ASSERT_LT(row_key, predicate.upper_) << "Yielded integer excluded by predicate";
          EXPECT_EQ(expected_key, row_key) << "Yielded out of order at idx " << total_idx;
          bool is_deleted = false;
          if (include_deleted_rows) {
            CHECK_NE(Schema::kColumnNotFound,
                     schema.first_is_deleted_virtual_column_idx());
            is_deleted = *schema.ExtractColumnFromRow<IS_DELETED>(
                dst.row(i), schema.first_is_deleted_virtual_column_idx());
            EXPECT_EQ(expected_is_deleted, is_deleted)
                << "Row " << row_key << " has unexpected IS_DELETED value at index " << total_idx;
          }
          VLOG(2) << "Observed: val=" << row_key << ", is_deleted=" << is_deleted;
          VLOG(2) << "Expected: val=" << expected_key << ", is_deleted=" << expected_is_deleted;
          ++expected_iter;
          ++total_idx;
        }
      }
      ASSERT_EQ(expected.size(), total_idx);
      ASSERT_EQ(expected.end(), expected_iter);
    }
  }
}