in tantivy/src/indexer/index_writer.rs [1608:2127]
fn test_operation_strategy(
ops: &[IndexingOp],
sort_index: bool,
force_end_merge: bool,
) -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let ip_field = schema_builder.add_ip_addr_field("ip", FAST | INDEXED | STORED);
let ips_field = schema_builder.add_ip_addr_field(
"ips",
IpAddrOptions::default()
.set_fast(Cardinality::MultiValues)
.set_indexed(),
);
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED);
let i64_field = schema_builder.add_i64_field("i64", INDEXED);
let f64_field = schema_builder.add_f64_field("f64", INDEXED);
let date_field = schema_builder.add_date_field("date", INDEXED);
let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED);
let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED);
let text_field = schema_builder.add_text_field(
"text_field",
TextOptions::default()
.set_indexing_options(
TextFieldIndexing::default()
.set_index_option(schema::IndexRecordOption::WithFreqsAndPositions),
)
.set_stored(),
);
let large_text_field = schema_builder.add_text_field("large_text_field", TEXT | STORED);
let multi_text_fields = schema_builder.add_text_field("multi_text_fields", TEXT | STORED);
let multi_numbers = schema_builder.add_u64_field(
"multi_numbers",
NumericOptions::default()
.set_fast(Cardinality::MultiValues)
.set_stored(),
);
let multi_bools = schema_builder.add_bool_field(
"multi_bools",
NumericOptions::default()
.set_fast(Cardinality::MultiValues)
.set_stored(),
);
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let settings = if sort_index {
IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Asc,
}),
..Default::default()
}
} else {
IndexSettings {
..Default::default()
}
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let old_reader = index.reader()?;
let ip_exists = |id| id % 3 != 0; // 0 does not exist
let multi_text_field_text1 = "test1 test2 test3 test1 test2 test3";
// rotate left
let multi_text_field_text2 = "test2 test3 test1 test2 test3 test1";
// rotate right
let multi_text_field_text3 = "test3 test1 test2 test3 test1 test2";
let ip_from_id = |id| Ipv6Addr::from_u128(id as u128);
for &op in ops {
match op {
IndexingOp::AddDoc { id } => {
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
let ip = ip_from_id(id);
if !ip_exists(id) {
// every 3rd doc has no ip field
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
i64_field => id as i64,
f64_field => id as f64,
date_field => DateTime::from_timestamp_secs(id as i64),
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field => LOREM,
multi_text_fields => multi_text_field_text1,
multi_text_fields => multi_text_field_text2,
multi_text_fields => multi_text_field_text3,
))?;
} else {
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
ip_field => ip,
ips_field => ip,
ips_field => ip,
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
i64_field => id as i64,
f64_field => id as f64,
date_field => DateTime::from_timestamp_secs(id as i64),
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field => LOREM,
multi_text_fields => multi_text_field_text1,
multi_text_fields => multi_text_field_text2,
multi_text_fields => multi_text_field_text3,
))?;
}
}
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));
}
IndexingOp::DeleteDocQuery { id } => {
let term = Term::from_field_u64(id_field, id);
let query = TermQuery::new(term, Default::default());
index_writer.delete_query(Box::new(query))?;
}
IndexingOp::Commit => {
index_writer.commit()?;
}
IndexingOp::Merge => {
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
if segment_ids.len() >= 2 {
index_writer.merge(&segment_ids).wait().unwrap();
assert!(index_writer.segment_updater().wait_merging_thread().is_ok());
}
}
}
}
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let num_segments_before_merge = searcher.segment_readers().len();
if force_end_merge {
index_writer.wait_merging_threads()?;
let mut index_writer = index.writer_for_tests()?;
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
if segment_ids.len() >= 2 {
index_writer.merge(&segment_ids).wait().unwrap();
assert!(index_writer.wait_merging_threads().is_ok());
}
}
let num_segments_after_merge = searcher.segment_readers().len();
old_reader.reload()?;
let old_searcher = old_reader.searcher();
let ids_old_searcher: HashSet<u64> = old_searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
segment_reader
.doc_ids_alive()
.map(move |doc| ff_reader.get_val(doc))
})
.collect();
let ids: HashSet<u64> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
segment_reader
.doc_ids_alive()
.map(move |doc| ff_reader.get_val(doc))
})
.collect();
let (expected_ids_and_num_occurrences, deleted_ids) = expected_ids(ops);
let id_list = get_id_list(ops);
// multivalue fast field content
let mut all_ips = Vec::new();
let mut num_ips = 0;
for segment_reader in searcher.segment_readers().iter() {
let ip_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
for doc in segment_reader.doc_ids_alive() {
let mut vals = vec![];
ip_reader.get_vals(doc, &mut vals);
all_ips.extend_from_slice(&vals);
}
num_ips += ip_reader.total_num_vals();
}
let num_docs_expected = expected_ids_and_num_occurrences
.iter()
.map(|(_, id_occurrences)| *id_occurrences as usize)
.sum::<usize>();
assert_eq!(searcher.num_docs() as usize, num_docs_expected);
assert_eq!(old_searcher.num_docs() as usize, num_docs_expected);
assert_eq!(
ids_old_searcher,
expected_ids_and_num_occurrences
.keys()
.cloned()
.collect::<HashSet<_>>()
);
assert_eq!(
ids,
expected_ids_and_num_occurrences
.keys()
.cloned()
.collect::<HashSet<_>>()
);
if force_end_merge && num_segments_before_merge > 1 && num_segments_after_merge == 1 {
let mut expected_multi_ips: Vec<_> = id_list
.iter()
.filter(|id| ip_exists(**id))
.flat_map(|id| vec![ip_from_id(*id), ip_from_id(*id)])
.collect();
assert_eq!(num_ips, expected_multi_ips.len() as u32);
expected_multi_ips.sort();
all_ips.sort();
assert_eq!(expected_multi_ips, all_ips);
// Test fastfield num_docs
let num_docs: usize = searcher
.segment_readers()
.iter()
.map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
ff_reader.get_index_reader().num_docs() as usize
})
.sum();
assert_eq!(num_docs, num_docs_expected);
}
// Load all ips addr
let ips: HashSet<Ipv6Addr> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap();
segment_reader.doc_ids_alive().flat_map(move |doc| {
let val = ff_reader.get_val(doc);
if val == Ipv6Addr::from_u128(0) {
// TODO Fix null handling
None
} else {
Some(val)
}
})
})
.collect();
let expected_ips = expected_ids_and_num_occurrences
.keys()
.flat_map(|id| {
if !ip_exists(*id) {
None
} else {
Some(Ipv6Addr::from_u128(*id as u128))
}
})
.collect::<HashSet<_>>();
assert_eq!(ips, expected_ips);
let expected_ips = expected_ids_and_num_occurrences
.keys()
.filter_map(|id| {
if !ip_exists(*id) {
None
} else {
Some(Ipv6Addr::from_u128(*id as u128))
}
})
.collect::<HashSet<_>>();
let ips: HashSet<Ipv6Addr> = searcher
.segment_readers()
.iter()
.flat_map(|segment_reader| {
let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
segment_reader.doc_ids_alive().flat_map(move |doc| {
let mut vals = vec![];
ff_reader.get_vals(doc, &mut vals);
vals.into_iter().filter(|val| val.to_u128() != 0) // TODO Fix null handling
})
})
.collect();
assert_eq!(ips, expected_ips);
// multivalue fast field tests
for segment_reader in searcher.segment_readers().iter() {
let id_reader = segment_reader.fast_fields().u64(id_field).unwrap();
let ff_reader = segment_reader.fast_fields().u64s(multi_numbers).unwrap();
let bool_ff_reader = segment_reader.fast_fields().bools(multi_bools).unwrap();
for doc in segment_reader.doc_ids_alive() {
let mut vals = vec![];
ff_reader.get_vals(doc, &mut vals);
assert_eq!(vals.len(), 2);
assert_eq!(vals[0], vals[1]);
assert_eq!(id_reader.get_val(doc), vals[0]);
let mut bool_vals = vec![];
bool_ff_reader.get_vals(doc, &mut bool_vals);
assert_eq!(bool_vals.len(), 2);
assert_ne!(bool_vals[0], bool_vals[1]);
assert!(expected_ids_and_num_occurrences.contains_key(&vals[0]));
}
}
// doc store tests
for segment_reader in searcher.segment_readers().iter() {
let store_reader = segment_reader
.get_store_reader(DOCSTORE_CACHE_CAPACITY)
.unwrap();
// test store iterator
for doc in store_reader.iter(segment_reader.alive_bitset()) {
let id = doc.unwrap().get_first(id_field).unwrap().as_u64().unwrap();
assert!(expected_ids_and_num_occurrences.contains_key(&id));
}
// test store random access
for doc_id in segment_reader.doc_ids_alive() {
let id = store_reader
.get(doc_id)
.unwrap()
.get_first(id_field)
.unwrap()
.as_u64()
.unwrap();
assert!(expected_ids_and_num_occurrences.contains_key(&id));
let id2 = store_reader
.get(doc_id)
.unwrap()
.get_first(multi_numbers)
.unwrap()
.as_u64()
.unwrap();
assert_eq!(id, id2);
let bool = store_reader
.get(doc_id)
.unwrap()
.get_first(bool_field)
.unwrap()
.as_bool()
.unwrap();
let doc = store_reader.get(doc_id).unwrap();
let mut bool2 = doc.get_all(multi_bools);
assert_eq!(bool, bool2.next().unwrap().as_bool().unwrap());
assert_ne!(bool, bool2.next().unwrap().as_bool().unwrap());
assert_eq!(None, bool2.next())
}
}
// test search
let do_search = |term: &str, field| {
let query = QueryParser::for_index(&index, vec![field])
.parse_query(term)
.unwrap();
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(1000)).unwrap();
top_docs.iter().map(|el| el.1).collect::<Vec<_>>()
};
let do_search2 = |term: Term| {
let query = TermQuery::new(term, IndexRecordOption::Basic);
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(1000)).unwrap();
top_docs.iter().map(|el| el.1).collect::<Vec<_>>()
};
for (existing_id, count) in &expected_ids_and_num_occurrences {
let (existing_id, count) = (*existing_id, *count);
let get_num_hits = |field| do_search(&existing_id.to_string(), field).len() as u64;
assert_eq!(get_num_hits(text_field), count);
assert_eq!(get_num_hits(i64_field), count);
assert_eq!(get_num_hits(f64_field), count);
assert_eq!(get_num_hits(id_field), count);
// Test multi text
assert_eq!(
do_search("\"test1 test2\"", multi_text_fields).len(),
num_docs_expected
);
assert_eq!(
do_search("\"test2 test3\"", multi_text_fields).len(),
num_docs_expected
);
// Test bytes
let term = Term::from_field_bytes(bytes_field, existing_id.to_le_bytes().as_slice());
assert_eq!(do_search2(term).len() as u64, count);
// Test date
let term = Term::from_field_date(
date_field,
DateTime::from_timestamp_secs(existing_id as i64),
);
assert_eq!(do_search2(term).len() as u64, count);
}
for deleted_id in deleted_ids {
let assert_field = |field| {
assert_eq!(do_search(&deleted_id.to_string(), field).len() as u64, 0);
};
assert_field(text_field);
assert_field(f64_field);
assert_field(i64_field);
assert_field(id_field);
// Test bytes
let term = Term::from_field_bytes(bytes_field, deleted_id.to_le_bytes().as_slice());
assert_eq!(do_search2(term).len() as u64, 0);
// Test date
let term =
Term::from_field_date(date_field, DateTime::from_timestamp_secs(deleted_id as i64));
assert_eq!(do_search2(term).len() as u64, 0);
}
// search ip address
//
for (existing_id, count) in &expected_ids_and_num_occurrences {
let (existing_id, count) = (*existing_id, *count);
if !ip_exists(existing_id) {
continue;
}
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
let ip_addr = Ipv6Addr::from_u128(existing_id as u128);
// Test incoming ip as ipv6
assert_eq!(do_search_ip_field(&format!("\"{}\"", ip_addr)), count);
let term = Term::from_field_ip_addr(ip_field, ip_addr);
assert_eq!(do_search2(term).len() as u64, count);
// Test incoming ip as ipv4
if let Some(ip_addr) = ip_addr.to_ipv4_mapped() {
assert_eq!(do_search_ip_field(&format!("\"{}\"", ip_addr)), count);
}
}
// assert data is like expected
//
for (existing_id, count) in expected_ids_and_num_occurrences.iter().take(10) {
let (existing_id, count) = (*existing_id, *count);
if !ip_exists(existing_id) {
continue;
}
let gen_query_inclusive = |field: &str, from: Ipv6Addr, to: Ipv6Addr| {
format!("{}:[{} TO {}]", field, &from.to_string(), &to.to_string())
};
let ip = ip_from_id(existing_id);
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
// Range query on single value field
// let query = gen_query_inclusive("ip", ip, ip);
// assert_eq!(do_search_ip_field(&query), count);
// Range query on multi value field
let query = gen_query_inclusive("ips", ip, ip);
assert_eq!(do_search_ip_field(&query), count);
}
// ip range query on fast field
//
for (existing_id, count) in expected_ids_and_num_occurrences.iter().take(10) {
let (existing_id, count) = (*existing_id, *count);
if !ip_exists(existing_id) {
continue;
}
let gen_query_inclusive = |field: &str, from: Ipv6Addr, to: Ipv6Addr| {
format!("{}:[{} TO {}]", field, &from.to_string(), &to.to_string())
};
let ip = ip_from_id(existing_id);
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
// Range query on single value field
// let query = gen_query_inclusive("ip", ip, ip);
// assert_eq!(do_search_ip_field(&query), count);
// Range query on multi value field
let query = gen_query_inclusive("ips", ip, ip);
assert_eq!(do_search_ip_field(&query), count);
}
// test facets
for segment_reader in searcher.segment_readers().iter() {
let mut facet_reader = segment_reader.facet_reader(facet_field).unwrap();
let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
for doc_id in segment_reader.doc_ids_alive() {
let mut facet_ords = Vec::new();
facet_reader.facet_ords(doc_id, &mut facet_ords);
assert_eq!(facet_ords.len(), 1);
let mut facet = Facet::default();
facet_reader
.facet_from_ord(facet_ords[0], &mut facet)
.unwrap();
let id = ff_reader.get_val(doc_id);
let facet_expected = Facet::from(&("/cola/".to_string() + &id.to_string()));
assert_eq!(facet, facet_expected);
}
}
Ok(())
}