in netbench/src/client/thread.rs [124:175]
fn poll_op<T: Trace, Ch: Checkpoints>(
&mut self,
client: &mut C,
addresses: &AddressMap,
trace: &mut T,
checkpoints: &mut Ch,
now: Timestamp,
cx: &mut Context,
) -> Poll<Result<()>> {
match self.op.as_mut().unwrap() {
Op::Sleep => {
ready!(self.timer.poll(now));
}
Op::Connect { connect, id, start } => {
let connect = core::pin::Pin::new(connect);
let connection = ready!(connect.poll(cx))?;
let time = now - *start;
trace.enter_connection(connection.id());
trace.connect(now, *id, time);
self.op = Some(Op::Connection { connection });
return self.poll_op(client, addresses, trace, checkpoints, now, cx);
}
Op::Connection { connection } => {
ready!(connection.poll(trace, checkpoints, now, cx))?;
}
Op::Wait { checkpoint } => {
ready!(checkpoints.park(*checkpoint));
trace.unpark(now, *checkpoint);
}
Op::Scope { threads } => {
let mut all_ready = true;
let op_idx = self.index;
for (idx, thread) in threads.iter_mut().enumerate() {
trace.enter(now, op_idx as _, idx);
let result = thread.poll(client, addresses, trace, checkpoints, now, cx);
trace.exit(now);
match result {
Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(err)) => return Err(err).into(),
Poll::Pending => all_ready = false,
}
}
if !all_ready {
return Poll::Pending;
}
}
}
// clear the timer for the next operation
self.timer.cancel();
Ok(()).into()
}