fn concurrent_flow()

in dc/s2n-quic-dc/src/stream/send/flow/blocking.rs [153:252]


    fn concurrent_flow() {
        let mut initial_offset = VarInt::from_u8(255);
        let expected_len = VarInt::from_u16(u16::MAX);
        let state = State::new(initial_offset);
        let path_info = path::Info {
            max_datagram_size: 1500,
            send_quantum: 10,
            ecn: Default::default(),
            next_expected_control_packet: Default::default(),
        };
        let total = AtomicU64::new(0);
        let workers = 5;
        let worker_counts = Vec::from_iter((0..workers).map(|_| AtomicU64::new(0)));
        let features = TransportFeatures::UDP;

        thread::scope(|s| {
            let total = &total;
            let path_info = &path_info;
            let state = &state;

            for (idx, count) in worker_counts.iter().enumerate() {
                s.spawn(move || {
                    thread::sleep(core::time::Duration::from_millis(10));

                    let mut buffer_len = 1;
                    let mut is_fin = false;
                    let max_segments = 10;
                    let max_header_len = 50;
                    let mut max_offset = VarInt::ZERO;

                    loop {
                        let mut request = flow::Request {
                            len: buffer_len,
                            initial_len: buffer_len,
                            is_fin,
                        };
                        request.clamp(path_info.max_flow_credits(max_header_len, max_segments));

                        let Ok(credits) = state.acquire(request, &features) else {
                            break;
                        };

                        eprintln!(
                            "thread={idx} offset={}..{}",
                            credits.offset,
                            credits.offset + credits.len
                        );
                        buffer_len += 1;
                        buffer_len = buffer_len.min(
                            expected_len
                                .as_u64()
                                .saturating_sub(credits.offset.as_u64())
                                .saturating_sub(credits.len as u64)
                                as usize,
                        );

                        assert!(max_offset <= credits.offset);
                        max_offset = credits.offset;

                        if buffer_len == 0 {
                            is_fin = true;
                        }
                        total.fetch_add(credits.len as _, Ordering::Relaxed);
                        count.fetch_add(credits.len as _, Ordering::Relaxed);
                    }
                });
            }

            s.spawn(|| {
                let mut credits = 10;
                while initial_offset < expected_len {
                    thread::sleep(core::time::Duration::from_millis(1));
                    initial_offset = (initial_offset + credits).min(expected_len);
                    credits += 1;
                    let _ = state.release(initial_offset);
                }
            });
        });

        assert_eq!(total.load(Ordering::Relaxed), expected_len.as_u64());
        let mut at_least_one_write = true;
        for (idx, count) in worker_counts.into_iter().enumerate() {
            let count = count.load(Ordering::Relaxed);
            eprintln!("thread={idx}, count={}", count);
            if count == 0 {
                at_least_one_write = false;
            }
        }

        let _ = at_least_one_write;

        // TODO the Mutex mechanism doesn't fairly distribute between workers so don't make this
        // assertion until we can do something more reliable
        /*
        assert!(
            at_least_one_write,
            "all workers need to write at least one byte"
        );
        */
    }