in shed/futures_01_ext/src/streamfork.rs [152:203]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.finished.is_some() {
// Polling input stream ended, possibly with an error.
// Let's make sure we send all already fetched data to the outputs
try_ready!(self.poll_complete_both());
let finished_res = self.finished.take().expect("is_some() returned false");
return finished_res.map(|()| Async::Ready(self.take_result()));
}
// Make sure both outputs are clear to accept new data
{
let r1 = self.out1.push()?.is_ready();
let r2 = self.out2.push()?.is_ready();
if !(r1 && r2) {
return Ok(Async::NotReady);
}
}
// Read input and send to outputs until either input dries up or outputs are full
loop {
match self.inp_mut().poll() {
Ok(Async::Ready(Some(item))) => {
if (self.pred)(&item)? {
try_ready!(self.out2.try_start_send(item))
} else {
try_ready!(self.out1.try_start_send(item))
}
}
Ok(Async::NotReady) => {
self.out1.poll_complete()?;
self.out2.poll_complete()?;
return Ok(Async::NotReady);
}
Ok(Async::Ready(None)) => {
if !self.poll_complete_both()?.is_ready() {
self.finished = Some(Ok(()));
return Ok(Async::NotReady);
}
return Ok(Async::Ready(self.take_result()));
}
Err(err) => {
if !self.poll_complete_both()?.is_ready() {
self.finished = Some(Err(err.into()));
return Ok(Async::NotReady);
}
return Err(err.into());
}
}
}
}