in compiler/crates/intern/src/atomic_arena.rs [784:848]
fn add_parallel_read() {
const N: u32 = 1_000_000;
let arena: Arc<AtomicArena<'_, String>> = Arc::new(AtomicArena::new());
// Make sure we don't just run the producer or all the
// consumers without interleaving them.
let progress = Arc::new((Mutex::new(0u32), Condvar::new()));
let mut consumers = Vec::new();
let len = Arc::new(AtomicU32::new(0));
for r in 0..10 {
// Consumers
let arena = arena.clone();
let progress = progress.clone();
let len = len.clone();
consumers.push(thread::spawn(move || {
const I: u32 = N * 3 / 2;
let mut rng = thread_rng();
let mut next_poke = 1500;
let mut next_seek = 1000;
let mut n_seen = 0;
for ii in 0..I {
let n = len.load(Ordering::Acquire);
if n > 0 {
// First reader always checks latest completed add.
let i = if r == 0 { n - 1 } else { rng.gen_range(0..n) };
let s = arena.get(mk_ref(i));
assert_eq!(s, &format!("{}", i));
if r == 0 {
// n should trail arena.len().
let l = arena.len();
assert!(n as usize <= l);
}
n_seen += 1;
}
if ii == next_poke {
let (lock, cvar) = &*progress;
let mut l = lock.lock();
while *l < next_seek {
cvar.wait(&mut l);
}
next_poke *= 10;
next_seek *= 10;
}
}
assert!(n_seen > 0);
}));
}
{
// Producer in main thread.
let mut next_poke = 1000;
for i in 0..N {
let r = arena.add(format!("{}", i));
assert_eq!(r.index(), i);
len.store(i + 1, Ordering::Release);
if i == next_poke {
let (lock, cvar) = &*progress;
*lock.lock() = i;
cvar.notify_all();
next_poke *= 10;
}
}
}
for c in consumers {
c.join().unwrap();
}
}