in src/devices/src/virtio/net/device.rs [518:631]
fn process_tx(&mut self) -> result::Result<(), DeviceError> {
// This is safe since we checked in the event handler that the device is activated.
let mem = self.device_state.mem().unwrap();
// The MMDS network stack works like a state machine, based on synchronous calls, and
// without being added to any event loop. If any frame is accepted by the MMDS, we also
// trigger a process_rx() which checks if there are any new frames to be sent, starting
// with the MMDS network stack.
let mut process_rx_for_mmds = false;
let mut raise_irq = false;
let tx_queue = &mut self.queues[TX_INDEX];
while let Some(head) = tx_queue.pop(mem) {
// If limiter.consume() fails it means there is no more TokenType::Ops
// budget and rate limiting is in effect.
if !self.tx_rate_limiter.consume(1, TokenType::Ops) {
// Stop processing the queue and return this descriptor chain to the
// avail ring, for later processing.
tx_queue.undo_pop();
METRICS.net.tx_rate_limiter_throttled.inc();
break;
}
let head_index = head.index;
let mut read_count = 0;
let mut next_desc = Some(head);
self.tx_iovec.clear();
while let Some(desc) = next_desc {
if desc.is_write_only() {
self.tx_iovec.clear();
break;
}
self.tx_iovec.push((desc.addr, desc.len as usize));
read_count += desc.len as usize;
next_desc = desc.next_descriptor();
}
// If limiter.consume() fails it means there is no more TokenType::Bytes
// budget and rate limiting is in effect.
if !self
.tx_rate_limiter
.consume(read_count as u64, TokenType::Bytes)
{
// revert the OPS consume()
self.tx_rate_limiter.manual_replenish(1, TokenType::Ops);
// Stop processing the queue and return this descriptor chain to the
// avail ring, for later processing.
tx_queue.undo_pop();
METRICS.net.tx_rate_limiter_throttled.inc();
break;
}
read_count = 0;
// Copy buffer from across multiple descriptors.
// TODO(performance - Issue #420): change this to use `writev()` instead of `write()`
// and get rid of the intermediate buffer.
for (desc_addr, desc_len) in self.tx_iovec.drain(..) {
let limit = cmp::min((read_count + desc_len) as usize, self.tx_frame_buf.len());
let read_result = mem.read_slice(
&mut self.tx_frame_buf[read_count..limit as usize],
desc_addr,
);
match read_result {
Ok(()) => {
read_count += limit - read_count;
METRICS.net.tx_count.inc();
}
Err(e) => {
error!("Failed to read slice: {:?}", e);
match e {
GuestMemoryError::PartialBuffer { .. } => &METRICS.net.tx_partial_reads,
_ => &METRICS.net.tx_fails,
}
.inc();
read_count = 0;
break;
}
}
}
let frame_consumed_by_mmds = Self::write_to_mmds_or_tap(
self.mmds_ns.as_mut(),
&mut self.tx_rate_limiter,
&self.tx_frame_buf[..read_count],
&mut self.tap,
self.guest_mac,
)
.unwrap_or(false);
if frame_consumed_by_mmds && !self.rx_deferred_frame {
// MMDS consumed this frame/request, let's also try to process the response.
process_rx_for_mmds = true;
}
tx_queue
.add_used(mem, head_index, 0)
.map_err(DeviceError::QueueError)?;
raise_irq = true;
}
if raise_irq {
self.signal_used_queue()?;
} else {
METRICS.net.no_tx_avail_buffer.inc();
}
// An incoming frame for the MMDS may trigger the transmission of a new message.
if process_rx_for_mmds {
self.process_rx()
} else {
Ok(())
}
}