in quic/s2n-quic-transport/src/stream/manager.rs [256:334]
fn open_stream_if_necessary(&mut self, stream_id: StreamId) -> Result<(), transport::Error> {
// If the stream ID is higher than any Stream ID we observed so far, we
// need open all Stream IDs of the same type. Otherwise we need to look
// up the Stream ID the map.
let first_unopened_id: StreamId = if let Some(first_unopened_id) = *self
.next_stream_ids
.get_mut(stream_id.initiator(), stream_id.stream_type())
{
first_unopened_id
} else {
// All Streams for particular initiator end endpoint type have
// already been opened. In this case we don't have to open a
// Stream, and the referenced Stream ID can also not be higher
// than a previous outgoing Stream ID we used.
return Ok(());
};
if stream_id.initiator() != self.local_endpoint_type {
if stream_id >= first_unopened_id {
// This Stream ID is first referenced here. This means we have
// to create a new Stream instance
if self.close_reason.is_some() {
return Err(transport::Error::NO_ERROR.with_reason("Connection was closed"));
}
//= https://www.rfc-editor.org/rfc/rfc9000#section-4.6
//# Endpoints MUST NOT exceed the limit set by their peer. An endpoint
//# that receives a frame with a stream ID exceeding the limit it has
//# sent MUST treat this as a connection error of type
//# STREAM_LIMIT_ERROR; see Section 11 for details on error handling.
let stream_iter = StreamIter::new(first_unopened_id, stream_id);
// Validate that there is enough capacity to open all streams.
self.stream_controller.on_open_remote_stream(stream_iter)?;
// We must create ALL streams with a lower Stream ID too:
//
//= https://www.rfc-editor.org/rfc/rfc9000#section-3.2
//# Before a stream is created, all streams of the same type with lower-
//# numbered stream IDs MUST be created. This ensures that the creation
//# order for streams is consistent on both endpoints.
for stream_id in stream_iter {
self.insert_stream(stream_id);
}
//= https://www.rfc-editor.org/rfc/rfc9000#section-2.1
//# A QUIC
//# endpoint MUST NOT reuse a stream ID within a connection.
// Increase the next expected Stream ID. We might thereby exhaust
// the Stream ID range, which means we can no longer accept a
// further Stream.
*self
.next_stream_ids
.get_mut(stream_id.initiator(), stream_id.stream_type()) =
stream_id.next_of_type();
// Wake up the application if it is waiting on new incoming Streams
if let Some(waker) = self.accept_state.waker_mut(stream_id.stream_type()).take() {
waker.wake();
}
}
} else {
// Check if the peer is sending us a frame for a local initiated Stream with
// a higher Stream ID than we ever used.
// In this case the peer seems to be time-travelling and know about
// Future Stream IDs we might use. We also will not accept this and
// close the connection.
if stream_id >= first_unopened_id {
return Err(
transport::Error::STREAM_STATE_ERROR.with_reason("Stream was not yet opened")
);
}
}
Ok(())
}