in src/devices/src/virtio/vsock/unix/muxer.rs [626:724]
fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F)
where
F: FnOnce(&mut MuxerConnection),
{
if let Some(conn) = self.conn_map.get_mut(&key) {
let had_rx = conn.has_pending_rx();
let was_expiring = conn.will_expire();
let prev_state = conn.state();
mut_fn(conn);
// If this is a host-initiated connection that has just become established, we'll have
// to send an ack message to the host end.
if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established {
let msg = format!("OK {}\n", key.local_port);
match conn.send_bytes_raw(msg.as_bytes()) {
Ok(written) if written == msg.len() => (),
Ok(_) => {
// If we can't write a dozen bytes to a pristine connection something
// must be really wrong. Killing it.
conn.kill();
warn!("vsock: unable to fully write connection ack msg.");
}
Err(err) => {
conn.kill();
warn!("vsock: unable to ack host connection: {:?}", err);
}
};
}
// If the connection wasn't previously scheduled for RX, add it to our RX queue.
if !had_rx && conn.has_pending_rx() {
self.rxq.push(MuxerRx::ConnRx(key));
}
// If the connection wasn't previously scheduled for termination, add it to the
// kill queue.
if !was_expiring && conn.will_expire() {
// It's safe to unwrap here, since `conn.will_expire()` already guaranteed that
// an `conn.expiry` is available.
self.killq.push(key, conn.expiry().unwrap());
}
let fd = conn.as_raw_fd();
let new_evset = conn.get_polled_evset();
if new_evset.is_empty() {
// If the connection no longer needs epoll notifications, remove its listener
// from our list.
self.remove_listener(fd);
return;
}
if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) {
if *evset != new_evset {
// If the set of events that the connection is interested in has changed,
// we need to update its epoll listener.
debug!(
"vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}",
key.local_port, key.peer_port, *evset, new_evset
);
*evset = new_evset;
self.epoll
.ctl(
ControlOperation::Modify,
fd,
EpollEvent::new(new_evset, fd as u64),
)
.unwrap_or_else(|err| {
// This really shouldn't happen, like, ever. However, "famous last
// words" and all that, so let's just kill it with fire, and walk away.
self.kill_connection(key);
error!(
"vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
key.local_port, key.peer_port, err
);
METRICS.vsock.muxer_event_fails.inc();
});
}
} else {
// The connection had previously asked to be removed from the listener map (by
// returning an empty event set via `get_polled_fd()`), but now wants back in.
self.add_listener(
fd,
EpollListener::Connection {
key,
evset: new_evset,
},
)
.unwrap_or_else(|err| {
self.kill_connection(key);
error!(
"vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
key.local_port, key.peer_port, err
);
METRICS.vsock.muxer_event_fails.inc();
});
}
}
}