fn poll_op()

in netbench/src/driver/thread.rs [184:321]


    fn poll_op<C: Connection, T: Trace, Ch: Checkpoints>(
        &mut self,
        conn: &mut C,
        trace: &mut T,
        checkpoints: &mut Ch,
        rates: &mut Rates,
        now: Timestamp,
        cx: &mut Context,
    ) -> Poll<Result<()>> {
        let owner = self.owner;
        match self.op.as_mut().unwrap() {
            Op::Sleep => {
                ready!(self.timer.poll(now));
            }
            Op::OpenBidirectionalStream { id } => {
                ready!(conn.poll_open_bidirectional_stream(*id, cx))?;
                trace.open(now, *id);
            }
            Op::OpenSendStream { id } => {
                ready!(conn.poll_open_send_stream(*id, cx))?;
                trace.open(now, *id);
            }
            Op::Send {
                id,
                remaining,
                rate,
            } => {
                return self.timer.transfer(remaining, rate, now, cx, |bytes, cx| {
                    let amount = ready!(conn.poll_send(owner, *id, *bytes, cx))?;
                    trace.send(now, *id, amount);
                    Ok(amount).into()
                })
            }
            Op::SendFinish { id } => {
                ready!(conn.poll_send_finish(owner, *id, cx))?;
                trace.send_finish(now, *id);
            }
            Op::Receive {
                id,
                remaining,
                rate,
            } => {
                return self.timer.transfer(remaining, rate, now, cx, |bytes, cx| {
                    let amount = ready!(conn.poll_receive(owner, *id, *bytes, cx))?;
                    trace.receive(now, *id, amount);
                    Ok(amount).into()
                })
            }
            Op::ReceiveAll { id, rate } => {
                let mut remaining = Byte::MAX;
                return self
                    .timer
                    .transfer(&mut remaining, rate, now, cx, |bytes, cx| {
                        let amount = ready!(conn.poll_receive(owner, *id, *bytes, cx))?;
                        trace.receive(now, *id, amount);
                        Ok(amount).into()
                    });
            }
            Op::ReceiveFinish { id } => {
                ready!(conn.poll_receive_finish(owner, *id, cx))?;
                trace.receive_finish(now, *id);
            }
            Op::Wait { checkpoint } => {
                ready!(checkpoints.park(*checkpoint));
                trace.unpark(now, *checkpoint);
            }
            Op::Iterate {
                start,
                count,
                thread,
                trace_id,
            } => {
                ready!(thread.poll(conn, trace, checkpoints, rates, now, cx))?;

                if let Some(trace_id) = trace_id {
                    let time = now.saturating_duration_since(*start);
                    trace.profile(now, *trace_id, time);
                    *start = now;
                }

                if let Some(next) = count.checked_sub(1) {
                    if next > 0 {
                        *count = next;
                        thread.reset(cx);
                        return Poll::Pending;
                    }
                }
            }
            Op::IterateFor {
                start,
                thread,
                trace_id,
            } => {
                if let Poll::Ready(res) = thread.poll(conn, trace, checkpoints, rates, now, cx) {
                    res?;
                    thread.reset(cx);

                    if let Some(trace_id) = trace_id {
                        let time = now.saturating_duration_since(*start);
                        trace.profile(now, *trace_id, time);
                        *start = now;
                    }
                }

                ready!(self.timer.poll(now));
            }
            Op::Profile {
                trace_id,
                start,
                thread,
            } => {
                ready!(thread.poll(conn, trace, checkpoints, rates, now, cx))?;
                let time = now.saturating_duration_since(*start);
                trace.profile(now, *trace_id, time);
            }
            Op::Scope { threads } => {
                let mut all_ready = true;
                let op_idx = self.index;
                for (idx, thread) in threads.iter_mut().enumerate() {
                    trace.enter(now, op_idx as _, idx);
                    let result = thread.poll(conn, trace, checkpoints, rates, now, cx);
                    trace.exit(now);
                    match result {
                        Poll::Ready(Ok(_)) => {}
                        Poll::Ready(Err(err)) => return Err(err).into(),
                        Poll::Pending => all_ready = false,
                    }
                }
                if !all_ready {
                    return Poll::Pending;
                }
            }
        }

        // clear the timer for the next operation
        self.timer.cancel();
        Ok(()).into()
    }