in compiler/crates/intern/src/atomic_arena.rs [851:938]
fn parallel_add_parallel_read() {
use std::sync::atomic::AtomicU32;
const N: u32 = 2_000_000;
const WRITERS: u32 = 10;
let arena: Arc<AtomicArena<'_, usize>> = Arc::new(AtomicArena::new());
let mut avail: Arc<Vec<AtomicU32>> = Arc::new(Vec::with_capacity(N as usize));
Arc::get_mut(&mut avail)
.unwrap()
.resize_with(N as usize, || AtomicU32::new(10 * N as u32));
// Make sure we don't just run the producer or all the
// consumers without interleaving them.
let progress = Arc::new((Mutex::new(0u32), Condvar::new()));
let mut producers = Vec::new();
let mut consumers = Vec::new();
for k in 0..WRITERS {
let arena = arena.clone();
let avail = avail.clone();
let progress = progress.clone();
producers.push(thread::spawn(move || {
let mut next_poke = 10;
for i in 0..N / WRITERS {
let n = i * WRITERS + k;
let id = arena.add(n as usize);
assert!(id.index() < N);
assert_eq!(
avail[id.index() as usize].load(Ordering::Acquire),
10 * N as u32
);
avail[id.index() as usize].store(n as u32, Ordering::Release);
if k == 0 && i == next_poke {
let (lock, cvar) = &*progress;
*lock.lock() = i;
next_poke *= 10;
cvar.notify_all();
}
}
}));
}
for _ in 0..10 {
const I: u32 = N * 3 / 2;
let arena = arena.clone();
let avail = avail.clone();
let progress = progress.clone();
consumers.push(thread::spawn(move || {
let mut rng = thread_rng();
let mut next_poke = 150;
let mut next_seek = 10;
let mut n_seen = 0;
for ii in 0..I {
let i = rng.gen_range(0..N);
let expect = avail[i as usize].load(Ordering::Acquire);
if expect < N {
let s = arena.get(mk_ref(i));
assert_eq!(*s, expect as usize);
n_seen += 1;
}
if ii == next_poke {
let (lock, cvar) = &*progress;
let mut l = lock.lock();
while *l < next_seek {
cvar.wait(&mut l);
}
next_poke *= 10;
next_seek *= 10;
if next_seek >= N {
next_seek = 0;
}
}
}
assert!(n_seen > 0);
}));
}
for p in producers {
p.join().unwrap();
}
assert_eq!(arena.len(), N as usize);
let mut fail: Vec<(u32, u32)> = Vec::new();
for i in 0..N {
let a = avail[i as usize].load(Ordering::Relaxed);
if a >= N {
fail.push((a, i as u32));
}
}
assert!(fail.is_empty(), "{:?}", fail);
for c in consumers {
c.join().unwrap();
}
}