in src/kudu/common/generic_iterators-test.cc [290:482]
void TestMerge(const Schema& schema,
const TestIntRangePredicate &predicate,
bool overlapping_ranges = true,
bool include_deleted_rows = false) {
struct List {
explicit List(int num_rows)
: sv(new SelectionVector(num_rows)) {
ints.reserve(num_rows);
is_deleted.reserve(num_rows);
}
vector<int64_t> ints;
vector<uint8_t> is_deleted;
unique_ptr<SelectionVector> sv;
optional<pair<string, string>> encoded_bounds;
};
vector<List> all_ints;
map<int64_t, bool> expected;
unordered_set<int64_t> seen_live;
const auto& encoder = GetKeyEncoder<string>(GetTypeInfo(INT64));
Random prng(SeedRandom());
int64_t entry = 0;
for (int i = 0; i < FLAGS_num_lists; i++) {
List list(FLAGS_num_rows);
unordered_set<int64_t> seen_this_list;
optional<int64_t> min_entry;
optional<int64_t> max_entry;
if (overlapping_ranges) {
entry = 0;
}
for (int j = 0; j < FLAGS_num_rows; j++) {
int64_t potential;
bool is_deleted = false;
// The merge iterator does not support duplicate non-deleted keys.
while (true) {
potential = entry + prng.Uniform(FLAGS_num_rows * FLAGS_num_lists * 10);
// Only one live version of a row can exist across all lists.
// Including several duplicate deleted keys is fine.
if (ContainsKey(seen_live, potential)) continue;
// No duplicate keys are allowed in the same list (same RowSet).
if (ContainsKey(seen_this_list, potential)) continue;
InsertOrDie(&seen_this_list, potential);
// If we are including deleted rows, with some probability make this a
// deleted row.
if (include_deleted_rows) {
is_deleted = prng.OneIn(4);
if (is_deleted) {
break;
}
}
// This is a new live row. Un-mark it as deleted if necessary.
if (!is_deleted) {
InsertOrDie(&seen_live, potential);
}
break;
}
entry = potential;
list.ints.emplace_back(entry);
list.is_deleted.emplace_back(is_deleted);
if (!max_entry || entry > max_entry) {
max_entry = entry;
}
if (!min_entry || entry < min_entry) {
min_entry = entry;
}
// Some entries are randomly deselected in order to exercise the selection
// vector logic in the MergeIterator. This is reflected both in the input
// to the MergeIterator as well as the expected output (see below).
bool row_selected = prng.Uniform(8) > 0;
VLOG(2) << Substitute("Row $0 with value $1 selected? $2",
j, entry, row_selected);
if (row_selected) {
list.sv->SetRowSelected(j);
} else {
list.sv->SetRowUnselected(j);
}
// Consider the predicate and the selection vector before inserting this
// row into 'expected'.
if (entry >= predicate.lower_ && entry < predicate.upper_ && row_selected) {
auto result = expected.emplace(entry, is_deleted);
if (!result.second) {
// We should only be overwriting a deleted row.
bool existing_is_deleted = result.first->second;
CHECK_EQ(true, existing_is_deleted);
result.first->second = is_deleted;
}
}
}
if (prng.Uniform(10) > 0) {
// Use the smallest and largest entries as bounds most of the time. They
// are randomly adjusted to reflect their inexactness in the real world.
list.encoded_bounds.emplace();
DCHECK(min_entry);
DCHECK(max_entry);
min_entry = *min_entry - prng.Uniform(5);
max_entry = *max_entry + prng.Uniform(5);
encoder.Encode(&(*min_entry), &list.encoded_bounds->first);
encoder.Encode(&(*max_entry), &list.encoded_bounds->second);
}
all_ints.emplace_back(std::move(list));
}
LOG_TIMING(INFO, "shuffling the inputs") {
std::random_device rdev;
std::mt19937 gen(rdev());
std::shuffle(all_ints.begin(), all_ints.end(), gen);
}
VLOG(1) << "Predicate expects " << expected.size() << " results: " << expected;
for (int trial = 0; trial < FLAGS_num_iters; trial++) {
vector<IterWithBounds> to_merge;
for (const auto& list : all_ints) {
unique_ptr<VectorIterator> vec_it(new VectorIterator(list.ints, list.is_deleted, schema));
vec_it->set_block_size(16);
vec_it->set_selection_vector(list.sv.get());
unique_ptr<RowwiseIterator> mat_it(NewMaterializingIterator(std::move(vec_it)));
IterWithBounds mat_iwb;
mat_iwb.iter = std::move(mat_it);
vector<IterWithBounds> un_input;
un_input.emplace_back(std::move(mat_iwb));
unique_ptr<RowwiseIterator> un_it(NewUnionIterator(std::move(un_input)));
IterWithBounds un_iwb;
un_iwb.iter = std::move(un_it);
if (list.encoded_bounds) {
un_iwb.encoded_bounds = list.encoded_bounds;
}
to_merge.emplace_back(std::move(un_iwb));
}
// Setup predicate exclusion
ScanSpec spec;
spec.AddPredicate(predicate.pred_);
LOG(INFO) << "Predicate: " << predicate.pred_.ToString();
LOG_TIMING(INFO, "iterating merged lists") {
unique_ptr<RowwiseIterator> merger(NewMergeIterator(
MergeIteratorOptions(include_deleted_rows), std::move(to_merge)));
ASSERT_OK(merger->Init(&spec));
// The RowBlock is sized to a power of 2 to improve BitmapCopy performance
// when copying another RowBlock into it.
RowBlockMemory mem;
RowBlock dst(&schema, 128, &mem);
size_t total_idx = 0;
auto expected_iter = expected.cbegin();
while (merger->HasNext()) {
ASSERT_OK(merger->NextBlock(&dst));
ASSERT_GT(dst.nrows(), 0) <<
"if HasNext() returns true, must return some rows";
for (int i = 0; i < dst.nrows(); i++) {
if (!dst.selection_vector()->IsRowSelected(i)) {
continue;
}
ASSERT_NE(expected.end(), expected_iter);
int64_t expected_key = expected_iter->first;
bool expected_is_deleted = expected_iter->second;
int64_t row_key = *schema.ExtractColumnFromRow<INT64>(dst.row(i), kValColIdx);
ASSERT_GE(row_key, predicate.lower_) << "Yielded integer excluded by predicate";
ASSERT_LT(row_key, predicate.upper_) << "Yielded integer excluded by predicate";
EXPECT_EQ(expected_key, row_key) << "Yielded out of order at idx " << total_idx;
bool is_deleted = false;
if (include_deleted_rows) {
CHECK_NE(Schema::kColumnNotFound,
schema.first_is_deleted_virtual_column_idx());
is_deleted = *schema.ExtractColumnFromRow<IS_DELETED>(
dst.row(i), schema.first_is_deleted_virtual_column_idx());
EXPECT_EQ(expected_is_deleted, is_deleted)
<< "Row " << row_key << " has unexpected IS_DELETED value at index " << total_idx;
}
VLOG(2) << "Observed: val=" << row_key << ", is_deleted=" << is_deleted;
VLOG(2) << "Expected: val=" << expected_key << ", is_deleted=" << expected_is_deleted;
++expected_iter;
++total_idx;
}
}
ASSERT_EQ(expected.size(), total_idx);
ASSERT_EQ(expected.end(), expected_iter);
}
}
}