fn parallel_add_parallel_read()

in compiler/crates/intern/src/atomic_arena.rs [851:938]


    fn parallel_add_parallel_read() {
        use std::sync::atomic::AtomicU32;
        const N: u32 = 2_000_000;
        const WRITERS: u32 = 10;
        let arena: Arc<AtomicArena<'_, usize>> = Arc::new(AtomicArena::new());
        let mut avail: Arc<Vec<AtomicU32>> = Arc::new(Vec::with_capacity(N as usize));
        Arc::get_mut(&mut avail)
            .unwrap()
            .resize_with(N as usize, || AtomicU32::new(10 * N as u32));
        // Make sure we don't just run the producer or all the
        // consumers without interleaving them.
        let progress = Arc::new((Mutex::new(0u32), Condvar::new()));
        let mut producers = Vec::new();
        let mut consumers = Vec::new();
        for k in 0..WRITERS {
            let arena = arena.clone();
            let avail = avail.clone();
            let progress = progress.clone();
            producers.push(thread::spawn(move || {
                let mut next_poke = 10;
                for i in 0..N / WRITERS {
                    let n = i * WRITERS + k;
                    let id = arena.add(n as usize);
                    assert!(id.index() < N);
                    assert_eq!(
                        avail[id.index() as usize].load(Ordering::Acquire),
                        10 * N as u32
                    );
                    avail[id.index() as usize].store(n as u32, Ordering::Release);
                    if k == 0 && i == next_poke {
                        let (lock, cvar) = &*progress;
                        *lock.lock() = i;
                        next_poke *= 10;
                        cvar.notify_all();
                    }
                }
            }));
        }
        for _ in 0..10 {
            const I: u32 = N * 3 / 2;
            let arena = arena.clone();
            let avail = avail.clone();
            let progress = progress.clone();
            consumers.push(thread::spawn(move || {
                let mut rng = thread_rng();
                let mut next_poke = 150;
                let mut next_seek = 10;
                let mut n_seen = 0;
                for ii in 0..I {
                    let i = rng.gen_range(0..N);
                    let expect = avail[i as usize].load(Ordering::Acquire);
                    if expect < N {
                        let s = arena.get(mk_ref(i));
                        assert_eq!(*s, expect as usize);
                        n_seen += 1;
                    }
                    if ii == next_poke {
                        let (lock, cvar) = &*progress;
                        let mut l = lock.lock();
                        while *l < next_seek {
                            cvar.wait(&mut l);
                        }
                        next_poke *= 10;
                        next_seek *= 10;
                        if next_seek >= N {
                            next_seek = 0;
                        }
                    }
                }
                assert!(n_seen > 0);
            }));
        }
        for p in producers {
            p.join().unwrap();
        }
        assert_eq!(arena.len(), N as usize);
        let mut fail: Vec<(u32, u32)> = Vec::new();
        for i in 0..N {
            let a = avail[i as usize].load(Ordering::Relaxed);
            if a >= N {
                fail.push((a, i as u32));
            }
        }
        assert!(fail.is_empty(), "{:?}", fail);
        for c in consumers {
            c.join().unwrap();
        }
    }