in src/devices/src/virtio/vsock/csm/connection.rs [438:484]
fn notify(&mut self, evset: EventSet) {
if evset.contains(EventSet::IN) {
// Data can be read from the host stream. Setting a Rw pending indication, so that
// the muxer will know to call `recv_pkt()` later.
self.pending_rx.insert(PendingRx::Rw);
}
if evset.contains(EventSet::OUT) {
// Data can be written to the host stream. Time to flush out the TX buffer.
//
if self.tx_buf.is_empty() {
METRICS.vsock.conn_event_fails.inc();
info!("vsock: connection received unexpected EPOLLOUT event");
return;
}
let flushed = self
.tx_buf
.flush_to(&mut self.stream)
.unwrap_or_else(|err| {
METRICS.vsock.tx_flush_fails.inc();
warn!(
"vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
self.local_port, self.peer_port, err
);
match err {
Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => {
// This should never happen (EWOULDBLOCK after EPOLLOUT), but
// it does, so let's absorb it.
}
_ => self.kill(),
};
0
});
self.fwd_cnt += Wrapping(flushed as u32);
METRICS.vsock.tx_bytes_count.add(flushed as usize);
// If this connection was shutting down, but is waiting to drain the TX buffer
// before forceful termination, the wait might be over.
if self.state == ConnState::PeerClosed(true, true) && self.tx_buf.is_empty() {
self.pending_rx.insert(PendingRx::Rst);
} else if self.peer_needs_credit_update() {
// If we've freed up some more buffer space, we may need to let the peer know it
// can safely send more data our way.
self.pending_rx.insert(PendingRx::CreditUpdate);
}
}
}