fn test_operation_strategy()

in tantivy/src/indexer/index_writer.rs [1608:2127]


    fn test_operation_strategy(
        ops: &[IndexingOp],
        sort_index: bool,
        force_end_merge: bool,
    ) -> crate::Result<()> {
        let mut schema_builder = schema::Schema::builder();
        let ip_field = schema_builder.add_ip_addr_field("ip", FAST | INDEXED | STORED);
        let ips_field = schema_builder.add_ip_addr_field(
            "ips",
            IpAddrOptions::default()
                .set_fast(Cardinality::MultiValues)
                .set_indexed(),
        );
        let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED);
        let i64_field = schema_builder.add_i64_field("i64", INDEXED);
        let f64_field = schema_builder.add_f64_field("f64", INDEXED);
        let date_field = schema_builder.add_date_field("date", INDEXED);
        let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED);
        let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED);
        let text_field = schema_builder.add_text_field(
            "text_field",
            TextOptions::default()
                .set_indexing_options(
                    TextFieldIndexing::default()
                        .set_index_option(schema::IndexRecordOption::WithFreqsAndPositions),
                )
                .set_stored(),
        );

        let large_text_field = schema_builder.add_text_field("large_text_field", TEXT | STORED);
        let multi_text_fields = schema_builder.add_text_field("multi_text_fields", TEXT | STORED);

        let multi_numbers = schema_builder.add_u64_field(
            "multi_numbers",
            NumericOptions::default()
                .set_fast(Cardinality::MultiValues)
                .set_stored(),
        );
        let multi_bools = schema_builder.add_bool_field(
            "multi_bools",
            NumericOptions::default()
                .set_fast(Cardinality::MultiValues)
                .set_stored(),
        );
        let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
        let schema = schema_builder.build();
        let settings = if sort_index {
            IndexSettings {
                sort_by_field: Some(IndexSortByField {
                    field: "id".to_string(),
                    order: Order::Asc,
                }),
                ..Default::default()
            }
        } else {
            IndexSettings {
                ..Default::default()
            }
        };
        let index = Index::builder()
            .schema(schema)
            .settings(settings)
            .create_in_ram()?;
        let mut index_writer = index.writer_for_tests()?;
        index_writer.set_merge_policy(Box::new(NoMergePolicy));

        let old_reader = index.reader()?;

        let ip_exists = |id| id % 3 != 0; // 0 does not exist

        let multi_text_field_text1 = "test1 test2 test3 test1 test2 test3";
        // rotate left
        let multi_text_field_text2 = "test2 test3 test1 test2 test3 test1";
        // rotate right
        let multi_text_field_text3 = "test3 test1 test2 test3 test1 test2";

        let ip_from_id = |id| Ipv6Addr::from_u128(id as u128);

        for &op in ops {
            match op {
                IndexingOp::AddDoc { id } => {
                    let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
                    let ip = ip_from_id(id);

                    if !ip_exists(id) {
                        // every 3rd doc has no ip field
                        index_writer.add_document(doc!(id_field=>id,
                                bytes_field => id.to_le_bytes().as_slice(),
                                multi_numbers=> id,
                                multi_numbers => id,
                                bool_field => (id % 2u64) != 0,
                                i64_field => id as i64,
                                f64_field => id as f64,
                                date_field => DateTime::from_timestamp_secs(id as i64),
                                multi_bools => (id % 2u64) != 0,
                                multi_bools => (id % 2u64) == 0,
                                text_field => id.to_string(),
                                facet_field => facet,
                                large_text_field => LOREM,
                                multi_text_fields => multi_text_field_text1,
                                multi_text_fields => multi_text_field_text2,
                                multi_text_fields => multi_text_field_text3,
                        ))?;
                    } else {
                        index_writer.add_document(doc!(id_field=>id,
                                bytes_field => id.to_le_bytes().as_slice(),
                                ip_field => ip,
                                ips_field => ip,
                                ips_field => ip,
                                multi_numbers=> id,
                                multi_numbers => id,
                                bool_field => (id % 2u64) != 0,
                                i64_field => id as i64,
                                f64_field => id as f64,
                                date_field => DateTime::from_timestamp_secs(id as i64),
                                multi_bools => (id % 2u64) != 0,
                                multi_bools => (id % 2u64) == 0,
                                text_field => id.to_string(),
                                facet_field => facet,
                                large_text_field => LOREM,
                                multi_text_fields => multi_text_field_text1,
                                multi_text_fields => multi_text_field_text2,
                                multi_text_fields => multi_text_field_text3,
                        ))?;
                    }
                }
                IndexingOp::DeleteDoc { id } => {
                    index_writer.delete_term(Term::from_field_u64(id_field, id));
                }
                IndexingOp::DeleteDocQuery { id } => {
                    let term = Term::from_field_u64(id_field, id);
                    let query = TermQuery::new(term, Default::default());
                    index_writer.delete_query(Box::new(query))?;
                }
                IndexingOp::Commit => {
                    index_writer.commit()?;
                }
                IndexingOp::Merge => {
                    let segment_ids = index
                        .searchable_segment_ids()
                        .expect("Searchable segments failed.");
                    if segment_ids.len() >= 2 {
                        index_writer.merge(&segment_ids).wait().unwrap();
                        assert!(index_writer.segment_updater().wait_merging_thread().is_ok());
                    }
                }
            }
        }
        index_writer.commit()?;

        let searcher = index.reader()?.searcher();
        let num_segments_before_merge = searcher.segment_readers().len();
        if force_end_merge {
            index_writer.wait_merging_threads()?;
            let mut index_writer = index.writer_for_tests()?;
            let segment_ids = index
                .searchable_segment_ids()
                .expect("Searchable segments failed.");
            if segment_ids.len() >= 2 {
                index_writer.merge(&segment_ids).wait().unwrap();
                assert!(index_writer.wait_merging_threads().is_ok());
            }
        }
        let num_segments_after_merge = searcher.segment_readers().len();

        old_reader.reload()?;
        let old_searcher = old_reader.searcher();

        let ids_old_searcher: HashSet<u64> = old_searcher
            .segment_readers()
            .iter()
            .flat_map(|segment_reader| {
                let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
                segment_reader
                    .doc_ids_alive()
                    .map(move |doc| ff_reader.get_val(doc))
            })
            .collect();

        let ids: HashSet<u64> = searcher
            .segment_readers()
            .iter()
            .flat_map(|segment_reader| {
                let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
                segment_reader
                    .doc_ids_alive()
                    .map(move |doc| ff_reader.get_val(doc))
            })
            .collect();

        let (expected_ids_and_num_occurrences, deleted_ids) = expected_ids(ops);

        let id_list = get_id_list(ops);

        // multivalue fast field content
        let mut all_ips = Vec::new();
        let mut num_ips = 0;
        for segment_reader in searcher.segment_readers().iter() {
            let ip_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
            for doc in segment_reader.doc_ids_alive() {
                let mut vals = vec![];
                ip_reader.get_vals(doc, &mut vals);
                all_ips.extend_from_slice(&vals);
            }
            num_ips += ip_reader.total_num_vals();
        }

        let num_docs_expected = expected_ids_and_num_occurrences
            .iter()
            .map(|(_, id_occurrences)| *id_occurrences as usize)
            .sum::<usize>();
        assert_eq!(searcher.num_docs() as usize, num_docs_expected);
        assert_eq!(old_searcher.num_docs() as usize, num_docs_expected);
        assert_eq!(
            ids_old_searcher,
            expected_ids_and_num_occurrences
                .keys()
                .cloned()
                .collect::<HashSet<_>>()
        );
        assert_eq!(
            ids,
            expected_ids_and_num_occurrences
                .keys()
                .cloned()
                .collect::<HashSet<_>>()
        );

        if force_end_merge && num_segments_before_merge > 1 && num_segments_after_merge == 1 {
            let mut expected_multi_ips: Vec<_> = id_list
                .iter()
                .filter(|id| ip_exists(**id))
                .flat_map(|id| vec![ip_from_id(*id), ip_from_id(*id)])
                .collect();
            assert_eq!(num_ips, expected_multi_ips.len() as u32);

            expected_multi_ips.sort();
            all_ips.sort();
            assert_eq!(expected_multi_ips, all_ips);

            // Test fastfield num_docs
            let num_docs: usize = searcher
                .segment_readers()
                .iter()
                .map(|segment_reader| {
                    let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
                    ff_reader.get_index_reader().num_docs() as usize
                })
                .sum();
            assert_eq!(num_docs, num_docs_expected);
        }

        // Load all ips addr
        let ips: HashSet<Ipv6Addr> = searcher
            .segment_readers()
            .iter()
            .flat_map(|segment_reader| {
                let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap();
                segment_reader.doc_ids_alive().flat_map(move |doc| {
                    let val = ff_reader.get_val(doc);
                    if val == Ipv6Addr::from_u128(0) {
                        // TODO Fix null handling
                        None
                    } else {
                        Some(val)
                    }
                })
            })
            .collect();

        let expected_ips = expected_ids_and_num_occurrences
            .keys()
            .flat_map(|id| {
                if !ip_exists(*id) {
                    None
                } else {
                    Some(Ipv6Addr::from_u128(*id as u128))
                }
            })
            .collect::<HashSet<_>>();
        assert_eq!(ips, expected_ips);

        let expected_ips = expected_ids_and_num_occurrences
            .keys()
            .filter_map(|id| {
                if !ip_exists(*id) {
                    None
                } else {
                    Some(Ipv6Addr::from_u128(*id as u128))
                }
            })
            .collect::<HashSet<_>>();
        let ips: HashSet<Ipv6Addr> = searcher
            .segment_readers()
            .iter()
            .flat_map(|segment_reader| {
                let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap();
                segment_reader.doc_ids_alive().flat_map(move |doc| {
                    let mut vals = vec![];
                    ff_reader.get_vals(doc, &mut vals);
                    vals.into_iter().filter(|val| val.to_u128() != 0) // TODO Fix null handling
                })
            })
            .collect();
        assert_eq!(ips, expected_ips);

        // multivalue fast field tests
        for segment_reader in searcher.segment_readers().iter() {
            let id_reader = segment_reader.fast_fields().u64(id_field).unwrap();
            let ff_reader = segment_reader.fast_fields().u64s(multi_numbers).unwrap();
            let bool_ff_reader = segment_reader.fast_fields().bools(multi_bools).unwrap();
            for doc in segment_reader.doc_ids_alive() {
                let mut vals = vec![];
                ff_reader.get_vals(doc, &mut vals);
                assert_eq!(vals.len(), 2);
                assert_eq!(vals[0], vals[1]);
                assert_eq!(id_reader.get_val(doc), vals[0]);

                let mut bool_vals = vec![];
                bool_ff_reader.get_vals(doc, &mut bool_vals);
                assert_eq!(bool_vals.len(), 2);
                assert_ne!(bool_vals[0], bool_vals[1]);

                assert!(expected_ids_and_num_occurrences.contains_key(&vals[0]));
            }
        }

        // doc store tests
        for segment_reader in searcher.segment_readers().iter() {
            let store_reader = segment_reader
                .get_store_reader(DOCSTORE_CACHE_CAPACITY)
                .unwrap();
            // test store iterator
            for doc in store_reader.iter(segment_reader.alive_bitset()) {
                let id = doc.unwrap().get_first(id_field).unwrap().as_u64().unwrap();
                assert!(expected_ids_and_num_occurrences.contains_key(&id));
            }
            // test store random access
            for doc_id in segment_reader.doc_ids_alive() {
                let id = store_reader
                    .get(doc_id)
                    .unwrap()
                    .get_first(id_field)
                    .unwrap()
                    .as_u64()
                    .unwrap();
                assert!(expected_ids_and_num_occurrences.contains_key(&id));
                let id2 = store_reader
                    .get(doc_id)
                    .unwrap()
                    .get_first(multi_numbers)
                    .unwrap()
                    .as_u64()
                    .unwrap();
                assert_eq!(id, id2);
                let bool = store_reader
                    .get(doc_id)
                    .unwrap()
                    .get_first(bool_field)
                    .unwrap()
                    .as_bool()
                    .unwrap();
                let doc = store_reader.get(doc_id).unwrap();
                let mut bool2 = doc.get_all(multi_bools);
                assert_eq!(bool, bool2.next().unwrap().as_bool().unwrap());
                assert_ne!(bool, bool2.next().unwrap().as_bool().unwrap());
                assert_eq!(None, bool2.next())
            }
        }
        // test search
        let do_search = |term: &str, field| {
            let query = QueryParser::for_index(&index, vec![field])
                .parse_query(term)
                .unwrap();
            let top_docs: Vec<(f32, DocAddress)> =
                searcher.search(&query, &TopDocs::with_limit(1000)).unwrap();

            top_docs.iter().map(|el| el.1).collect::<Vec<_>>()
        };

        let do_search2 = |term: Term| {
            let query = TermQuery::new(term, IndexRecordOption::Basic);
            let top_docs: Vec<(f32, DocAddress)> =
                searcher.search(&query, &TopDocs::with_limit(1000)).unwrap();

            top_docs.iter().map(|el| el.1).collect::<Vec<_>>()
        };

        for (existing_id, count) in &expected_ids_and_num_occurrences {
            let (existing_id, count) = (*existing_id, *count);
            let get_num_hits = |field| do_search(&existing_id.to_string(), field).len() as u64;
            assert_eq!(get_num_hits(text_field), count);
            assert_eq!(get_num_hits(i64_field), count);
            assert_eq!(get_num_hits(f64_field), count);
            assert_eq!(get_num_hits(id_field), count);

            // Test multi text
            assert_eq!(
                do_search("\"test1 test2\"", multi_text_fields).len(),
                num_docs_expected
            );
            assert_eq!(
                do_search("\"test2 test3\"", multi_text_fields).len(),
                num_docs_expected
            );

            // Test bytes
            let term = Term::from_field_bytes(bytes_field, existing_id.to_le_bytes().as_slice());
            assert_eq!(do_search2(term).len() as u64, count);

            // Test date
            let term = Term::from_field_date(
                date_field,
                DateTime::from_timestamp_secs(existing_id as i64),
            );
            assert_eq!(do_search2(term).len() as u64, count);
        }
        for deleted_id in deleted_ids {
            let assert_field = |field| {
                assert_eq!(do_search(&deleted_id.to_string(), field).len() as u64, 0);
            };
            assert_field(text_field);
            assert_field(f64_field);
            assert_field(i64_field);
            assert_field(id_field);

            // Test bytes
            let term = Term::from_field_bytes(bytes_field, deleted_id.to_le_bytes().as_slice());
            assert_eq!(do_search2(term).len() as u64, 0);

            // Test date
            let term =
                Term::from_field_date(date_field, DateTime::from_timestamp_secs(deleted_id as i64));
            assert_eq!(do_search2(term).len() as u64, 0);
        }
        // search ip address
        //
        for (existing_id, count) in &expected_ids_and_num_occurrences {
            let (existing_id, count) = (*existing_id, *count);
            if !ip_exists(existing_id) {
                continue;
            }
            let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
            let ip_addr = Ipv6Addr::from_u128(existing_id as u128);
            // Test incoming ip as ipv6
            assert_eq!(do_search_ip_field(&format!("\"{}\"", ip_addr)), count);

            let term = Term::from_field_ip_addr(ip_field, ip_addr);
            assert_eq!(do_search2(term).len() as u64, count);

            // Test incoming ip as ipv4
            if let Some(ip_addr) = ip_addr.to_ipv4_mapped() {
                assert_eq!(do_search_ip_field(&format!("\"{}\"", ip_addr)), count);
            }
        }

        // assert data is like expected
        //
        for (existing_id, count) in expected_ids_and_num_occurrences.iter().take(10) {
            let (existing_id, count) = (*existing_id, *count);
            if !ip_exists(existing_id) {
                continue;
            }
            let gen_query_inclusive = |field: &str, from: Ipv6Addr, to: Ipv6Addr| {
                format!("{}:[{} TO {}]", field, &from.to_string(), &to.to_string())
            };
            let ip = ip_from_id(existing_id);

            let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
            // Range query on single value field
            // let query = gen_query_inclusive("ip", ip, ip);
            // assert_eq!(do_search_ip_field(&query), count);

            // Range query on multi value field
            let query = gen_query_inclusive("ips", ip, ip);
            assert_eq!(do_search_ip_field(&query), count);
        }

        // ip range query on fast field
        //
        for (existing_id, count) in expected_ids_and_num_occurrences.iter().take(10) {
            let (existing_id, count) = (*existing_id, *count);
            if !ip_exists(existing_id) {
                continue;
            }
            let gen_query_inclusive = |field: &str, from: Ipv6Addr, to: Ipv6Addr| {
                format!("{}:[{} TO {}]", field, &from.to_string(), &to.to_string())
            };
            let ip = ip_from_id(existing_id);

            let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
            // Range query on single value field
            // let query = gen_query_inclusive("ip", ip, ip);
            // assert_eq!(do_search_ip_field(&query), count);

            // Range query on multi value field
            let query = gen_query_inclusive("ips", ip, ip);
            assert_eq!(do_search_ip_field(&query), count);
        }

        // test facets
        for segment_reader in searcher.segment_readers().iter() {
            let mut facet_reader = segment_reader.facet_reader(facet_field).unwrap();
            let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap();
            for doc_id in segment_reader.doc_ids_alive() {
                let mut facet_ords = Vec::new();
                facet_reader.facet_ords(doc_id, &mut facet_ords);
                assert_eq!(facet_ords.len(), 1);
                let mut facet = Facet::default();
                facet_reader
                    .facet_from_ord(facet_ords[0], &mut facet)
                    .unwrap();
                let id = ff_reader.get_val(doc_id);
                let facet_expected = Facet::from(&("/cola/".to_string() + &id.to_string()));

                assert_eq!(facet, facet_expected);
            }
        }
        Ok(())
    }