in netbench/src/driver/thread.rs [184:321]
fn poll_op<C: Connection, T: Trace, Ch: Checkpoints>(
&mut self,
conn: &mut C,
trace: &mut T,
checkpoints: &mut Ch,
rates: &mut Rates,
now: Timestamp,
cx: &mut Context,
) -> Poll<Result<()>> {
let owner = self.owner;
match self.op.as_mut().unwrap() {
Op::Sleep => {
ready!(self.timer.poll(now));
}
Op::OpenBidirectionalStream { id } => {
ready!(conn.poll_open_bidirectional_stream(*id, cx))?;
trace.open(now, *id);
}
Op::OpenSendStream { id } => {
ready!(conn.poll_open_send_stream(*id, cx))?;
trace.open(now, *id);
}
Op::Send {
id,
remaining,
rate,
} => {
return self.timer.transfer(remaining, rate, now, cx, |bytes, cx| {
let amount = ready!(conn.poll_send(owner, *id, *bytes, cx))?;
trace.send(now, *id, amount);
Ok(amount).into()
})
}
Op::SendFinish { id } => {
ready!(conn.poll_send_finish(owner, *id, cx))?;
trace.send_finish(now, *id);
}
Op::Receive {
id,
remaining,
rate,
} => {
return self.timer.transfer(remaining, rate, now, cx, |bytes, cx| {
let amount = ready!(conn.poll_receive(owner, *id, *bytes, cx))?;
trace.receive(now, *id, amount);
Ok(amount).into()
})
}
Op::ReceiveAll { id, rate } => {
let mut remaining = Byte::MAX;
return self
.timer
.transfer(&mut remaining, rate, now, cx, |bytes, cx| {
let amount = ready!(conn.poll_receive(owner, *id, *bytes, cx))?;
trace.receive(now, *id, amount);
Ok(amount).into()
});
}
Op::ReceiveFinish { id } => {
ready!(conn.poll_receive_finish(owner, *id, cx))?;
trace.receive_finish(now, *id);
}
Op::Wait { checkpoint } => {
ready!(checkpoints.park(*checkpoint));
trace.unpark(now, *checkpoint);
}
Op::Iterate {
start,
count,
thread,
trace_id,
} => {
ready!(thread.poll(conn, trace, checkpoints, rates, now, cx))?;
if let Some(trace_id) = trace_id {
let time = now.saturating_duration_since(*start);
trace.profile(now, *trace_id, time);
*start = now;
}
if let Some(next) = count.checked_sub(1) {
if next > 0 {
*count = next;
thread.reset(cx);
return Poll::Pending;
}
}
}
Op::IterateFor {
start,
thread,
trace_id,
} => {
if let Poll::Ready(res) = thread.poll(conn, trace, checkpoints, rates, now, cx) {
res?;
thread.reset(cx);
if let Some(trace_id) = trace_id {
let time = now.saturating_duration_since(*start);
trace.profile(now, *trace_id, time);
*start = now;
}
}
ready!(self.timer.poll(now));
}
Op::Profile {
trace_id,
start,
thread,
} => {
ready!(thread.poll(conn, trace, checkpoints, rates, now, cx))?;
let time = now.saturating_duration_since(*start);
trace.profile(now, *trace_id, time);
}
Op::Scope { threads } => {
let mut all_ready = true;
let op_idx = self.index;
for (idx, thread) in threads.iter_mut().enumerate() {
trace.enter(now, op_idx as _, idx);
let result = thread.poll(conn, trace, checkpoints, rates, now, cx);
trace.exit(now);
match result {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(err)) => return Err(err).into(),
Poll::Pending => all_ready = false,
}
}
if !all_ready {
return Poll::Pending;
}
}
}
// clear the timer for the next operation
self.timer.cancel();
Ok(()).into()
}