in dc/s2n-quic-dc/src/stream/send/flow/blocking.rs [153:252]
fn concurrent_flow() {
let mut initial_offset = VarInt::from_u8(255);
let expected_len = VarInt::from_u16(u16::MAX);
let state = State::new(initial_offset);
let path_info = path::Info {
max_datagram_size: 1500,
send_quantum: 10,
ecn: Default::default(),
next_expected_control_packet: Default::default(),
};
let total = AtomicU64::new(0);
let workers = 5;
let worker_counts = Vec::from_iter((0..workers).map(|_| AtomicU64::new(0)));
let features = TransportFeatures::UDP;
thread::scope(|s| {
let total = &total;
let path_info = &path_info;
let state = &state;
for (idx, count) in worker_counts.iter().enumerate() {
s.spawn(move || {
thread::sleep(core::time::Duration::from_millis(10));
let mut buffer_len = 1;
let mut is_fin = false;
let max_segments = 10;
let max_header_len = 50;
let mut max_offset = VarInt::ZERO;
loop {
let mut request = flow::Request {
len: buffer_len,
initial_len: buffer_len,
is_fin,
};
request.clamp(path_info.max_flow_credits(max_header_len, max_segments));
let Ok(credits) = state.acquire(request, &features) else {
break;
};
eprintln!(
"thread={idx} offset={}..{}",
credits.offset,
credits.offset + credits.len
);
buffer_len += 1;
buffer_len = buffer_len.min(
expected_len
.as_u64()
.saturating_sub(credits.offset.as_u64())
.saturating_sub(credits.len as u64)
as usize,
);
assert!(max_offset <= credits.offset);
max_offset = credits.offset;
if buffer_len == 0 {
is_fin = true;
}
total.fetch_add(credits.len as _, Ordering::Relaxed);
count.fetch_add(credits.len as _, Ordering::Relaxed);
}
});
}
s.spawn(|| {
let mut credits = 10;
while initial_offset < expected_len {
thread::sleep(core::time::Duration::from_millis(1));
initial_offset = (initial_offset + credits).min(expected_len);
credits += 1;
let _ = state.release(initial_offset);
}
});
});
assert_eq!(total.load(Ordering::Relaxed), expected_len.as_u64());
let mut at_least_one_write = true;
for (idx, count) in worker_counts.into_iter().enumerate() {
let count = count.load(Ordering::Relaxed);
eprintln!("thread={idx}, count={}", count);
if count == 0 {
at_least_one_write = false;
}
}
let _ = at_least_one_write;
// TODO the Mutex mechanism doesn't fairly distribute between workers so don't make this
// assertion until we can do something more reliable
/*
assert!(
at_least_one_write,
"all workers need to write at least one byte"
);
*/
}