in netbench/src/driver/thread.rs [68:181]
fn on_op<T: Trace, Ch: Checkpoints>(
&mut self,
op: &'a op::Connection,
trace: &mut T,
checkpoints: &mut Ch,
rates: &mut Rates,
now: Timestamp,
cx: &mut Context,
) {
trace.exec(now, op);
use op::{Connection::*, IterateValue};
match op {
Sleep { amount } => {
self.timer.sleep(now, *amount);
self.op = Some(Op::Sleep);
}
OpenBidirectionalStream { stream_id } => {
self.op = Some(Op::OpenBidirectionalStream { id: *stream_id });
}
OpenSendStream { stream_id } => {
self.op = Some(Op::OpenSendStream { id: *stream_id });
}
Send { stream_id, bytes } => {
self.op = Some(Op::Send {
id: *stream_id,
remaining: *bytes,
rate: rates.send.get(stream_id).cloned(),
});
}
SendFinish { stream_id } => {
self.op = Some(Op::SendFinish { id: *stream_id });
}
SendRate { stream_id, rate } => {
rates.send.insert(*stream_id, *rate);
}
Receive { stream_id, bytes } => {
self.op = Some(Op::Receive {
id: *stream_id,
remaining: *bytes,
rate: rates.receive.get(stream_id).cloned(),
});
}
ReceiveAll { stream_id } => {
self.op = Some(Op::ReceiveAll {
id: *stream_id,
rate: rates.receive.get(stream_id).cloned(),
});
}
ReceiveFinish { stream_id } => {
self.op = Some(Op::ReceiveFinish { id: *stream_id });
}
ReceiveRate { stream_id, rate } => {
rates.receive.insert(*stream_id, *rate);
}
Trace { trace_id } => {
trace.trace(now, *trace_id);
}
Profile {
trace_id,
operations,
} => {
self.op = Some(Op::Profile {
start: now,
trace_id: *trace_id,
thread: Box::new(Thread::new(operations, self.owner)),
});
}
Iterate {
value,
operations,
trace_id,
} => {
let start = now;
let trace_id = *trace_id;
let thread = Box::new(Thread::new(operations, self.owner));
self.op = Some(match value {
IterateValue::Count { amount } => Op::Iterate {
start,
count: *amount,
thread,
trace_id,
},
IterateValue::Time { amount } => {
self.timer.sleep(now, *amount);
Op::IterateFor {
start,
thread,
trace_id,
}
}
});
}
Park { checkpoint } => {
trace.park(now, *checkpoint);
self.op = Some(Op::Wait {
checkpoint: *checkpoint,
});
}
Unpark { checkpoint } => {
// notify the checkpoint that it can make progress
checkpoints.unpark(*checkpoint, cx);
}
Scope { threads } => {
if !threads.is_empty() {
let threads = threads
.iter()
.map(|thread| Thread::new(thread, self.owner))
.collect();
self.op = Some(Op::Scope { threads });
}
}
}
}