fn poll()

in shed/futures_01_ext/src/streamfork.rs [152:203]


    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        if self.finished.is_some() {
            // Polling input stream ended, possibly with an error.
            // Let's make sure we send all already fetched data to the outputs
            try_ready!(self.poll_complete_both());

            let finished_res = self.finished.take().expect("is_some() returned false");
            return finished_res.map(|()| Async::Ready(self.take_result()));
        }

        // Make sure both outputs are clear to accept new data
        {
            let r1 = self.out1.push()?.is_ready();
            let r2 = self.out2.push()?.is_ready();

            if !(r1 && r2) {
                return Ok(Async::NotReady);
            }
        }

        // Read input and send to outputs until either input dries up or outputs are full
        loop {
            match self.inp_mut().poll() {
                Ok(Async::Ready(Some(item))) => {
                    if (self.pred)(&item)? {
                        try_ready!(self.out2.try_start_send(item))
                    } else {
                        try_ready!(self.out1.try_start_send(item))
                    }
                }
                Ok(Async::NotReady) => {
                    self.out1.poll_complete()?;
                    self.out2.poll_complete()?;
                    return Ok(Async::NotReady);
                }
                Ok(Async::Ready(None)) => {
                    if !self.poll_complete_both()?.is_ready() {
                        self.finished = Some(Ok(()));
                        return Ok(Async::NotReady);
                    }
                    return Ok(Async::Ready(self.take_result()));
                }
                Err(err) => {
                    if !self.poll_complete_both()?.is_ready() {
                        self.finished = Some(Err(err.into()));
                        return Ok(Async::NotReady);
                    }
                    return Err(err.into());
                }
            }
        }
    }