in netbench/src/multiplex.rs [221:285]
fn dispatch_frame(&mut self, cx: &mut Context) -> Poll<Result<()>> {
use frame::Frame::*;
match self.frame.take() {
Some(StreamOpen { id, bidirectional }) => {
// TODO make sure the peer hasn't opened too many
let mut stream = Stream {
rx: Some(ReceiveStream::new(self.config.stream_window)),
tx: None,
};
if bidirectional {
stream.tx = Some(SendStream::new(self.config.peer_max_stream_data));
};
self.streams[Owner::Remote].insert(id, stream);
self.pending_accept.push_back(id);
cx.waker().wake_by_ref();
}
Some(StreamData {
id,
owner,
mut data,
}) => {
if let Some(rx) = self.streams[owner]
.get_mut(&id)
.and_then(|stream| stream.rx.as_mut())
{
let len = data.len() as u64;
let len = rx.buffer(len)?;
data.advance(len as _);
if !data.is_empty() {
self.frame = Some(StreamData { id, owner, data });
return Poll::Pending;
}
}
}
Some(MaxStreams { up_to }) => {
self.stream_controller.max_streams(up_to);
}
Some(MaxStreamData { id, owner, up_to }) => {
if let Some(tx) = self.streams[owner]
.get_mut(&id)
.and_then(|stream| stream.tx.as_mut())
{
tx.max_data(up_to);
}
}
Some(StreamFinish { id, owner }) => {
if let Some(rx) = self.streams[owner]
.get_mut(&id)
.ok_or("invalid stream")?
.rx
.as_mut()
{
rx.finish();
}
}
Some(InitialMaxStreamData { up_to }) => {
self.config.peer_max_stream_data = up_to;
}
None => return Poll::Pending,
}
Ok(()).into()
}