in neqo-transport/src/recv_stream.rs [1827:1967]
fn fc_state_recv_7() {
const CONNECTION_WINDOW: u64 = 1024;
const CONNECTION_WINDOW_US: usize = CONNECTION_WINDOW as usize;
const STREAM_WINDOW: u64 = CONNECTION_WINDOW / 2;
const STREAM_WINDOW_US: usize = STREAM_WINDOW as usize;
const WINDOW_UPDATE_FRACTION_US: usize = WINDOW_UPDATE_FRACTION as usize;
let fc = Rc::new(RefCell::new(ReceiverFlowControl::new(
(),
CONNECTION_WINDOW,
)));
let mut s = create_stream_with_fc(Rc::clone(&fc), STREAM_WINDOW);
check_fc(&fc.borrow(), 0, 0);
check_fc(s.fc().unwrap(), 0, 0);
// Receive data up to but not over the fc update trigger point.
s.inbound_stream_frame(false, 0, &[0; STREAM_WINDOW_US / WINDOW_UPDATE_FRACTION_US])
.unwrap();
let mut buf = [1; CONNECTION_WINDOW_US];
assert_eq!(
s.read(&mut buf).unwrap(),
(STREAM_WINDOW_US / WINDOW_UPDATE_FRACTION_US, false)
);
check_fc(
&fc.borrow(),
STREAM_WINDOW / WINDOW_UPDATE_FRACTION,
STREAM_WINDOW / WINDOW_UPDATE_FRACTION,
);
check_fc(
s.fc().unwrap(),
STREAM_WINDOW / WINDOW_UPDATE_FRACTION,
STREAM_WINDOW / WINDOW_UPDATE_FRACTION,
);
// Still no fc update needed.
assert!(!fc.borrow().frame_needed());
assert!(!s.fc().unwrap().frame_needed());
// Receive one more byte that will cause a fc update after it is read.
s.inbound_stream_frame(false, STREAM_WINDOW / WINDOW_UPDATE_FRACTION, &[0])
.unwrap();
check_fc(
&fc.borrow(),
STREAM_WINDOW / WINDOW_UPDATE_FRACTION + 1,
STREAM_WINDOW / WINDOW_UPDATE_FRACTION,
);
check_fc(
s.fc().unwrap(),
STREAM_WINDOW / WINDOW_UPDATE_FRACTION + 1,
STREAM_WINDOW / WINDOW_UPDATE_FRACTION,
);
// Only consuming data does not cause a fc update to be sent.
assert!(!fc.borrow().frame_needed());
assert!(!s.fc().unwrap().frame_needed());
assert_eq!(s.read(&mut buf).unwrap(), (1, false));
check_fc(
&fc.borrow(),
STREAM_WINDOW / WINDOW_UPDATE_FRACTION + 1,
STREAM_WINDOW / WINDOW_UPDATE_FRACTION + 1,
);
check_fc(
s.fc().unwrap(),
STREAM_WINDOW / WINDOW_UPDATE_FRACTION + 1,
STREAM_WINDOW / WINDOW_UPDATE_FRACTION + 1,
);
// Data are retired and the stream fc will send an update.
assert!(!fc.borrow().frame_needed());
assert!(s.fc().unwrap().frame_needed());
// Receive more data to increase fc further.
s.inbound_stream_frame(
false,
STREAM_WINDOW / WINDOW_UPDATE_FRACTION,
&[0; STREAM_WINDOW_US / WINDOW_UPDATE_FRACTION_US],
)
.unwrap();
assert_eq!(
s.read(&mut buf).unwrap(),
(STREAM_WINDOW_US / WINDOW_UPDATE_FRACTION_US - 1, false)
);
check_fc(
&fc.borrow(),
STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION,
STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION,
);
check_fc(
s.fc().unwrap(),
STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION,
STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION,
);
assert!(!fc.borrow().frame_needed());
assert!(s.fc().unwrap().frame_needed());
// Write the fc update frame
let mut builder = PacketBuilder::short(Encoder::new(), false, None::<&[u8]>);
let mut token = Vec::new();
let mut stats = FrameStats::default();
fc.borrow_mut()
.write_frames(&mut builder, &mut token, &mut stats);
assert_eq!(stats.max_data, 0);
s.write_frame(
&mut builder,
&mut token,
&mut stats,
Instant::now(),
Duration::from_millis(100),
);
assert_eq!(stats.max_stream_data, 1);
// Receive 1 byte that will cause a session fc update after it is read.
s.inbound_stream_frame(false, STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION, &[0])
.unwrap();
assert_eq!(s.read(&mut buf).unwrap(), (1, false));
check_fc(
&fc.borrow(),
STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION + 1,
STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION + 1,
);
check_fc(
s.fc().unwrap(),
STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION + 1,
STREAM_WINDOW * 2 / WINDOW_UPDATE_FRACTION + 1,
);
assert!(fc.borrow().frame_needed());
assert!(!s.fc().unwrap().frame_needed());
fc.borrow_mut()
.write_frames(&mut builder, &mut token, &mut stats);
assert_eq!(stats.max_data, 1);
s.write_frame(
&mut builder,
&mut token,
&mut stats,
Instant::now(),
Duration::from_millis(100),
);
assert_eq!(stats.max_stream_data, 1);
}