fn apply_conn_mutation()

in src/devices/src/virtio/vsock/unix/muxer.rs [626:724]


    fn apply_conn_mutation<F>(&mut self, key: ConnMapKey, mut_fn: F)
    where
        F: FnOnce(&mut MuxerConnection),
    {
        if let Some(conn) = self.conn_map.get_mut(&key) {
            let had_rx = conn.has_pending_rx();
            let was_expiring = conn.will_expire();
            let prev_state = conn.state();

            mut_fn(conn);

            // If this is a host-initiated connection that has just become established, we'll have
            // to send an ack message to the host end.
            if prev_state == ConnState::LocalInit && conn.state() == ConnState::Established {
                let msg = format!("OK {}\n", key.local_port);
                match conn.send_bytes_raw(msg.as_bytes()) {
                    Ok(written) if written == msg.len() => (),
                    Ok(_) => {
                        // If we can't write a dozen bytes to a pristine connection something
                        // must be really wrong. Killing it.
                        conn.kill();
                        warn!("vsock: unable to fully write connection ack msg.");
                    }
                    Err(err) => {
                        conn.kill();
                        warn!("vsock: unable to ack host connection: {:?}", err);
                    }
                };
            }

            // If the connection wasn't previously scheduled for RX, add it to our RX queue.
            if !had_rx && conn.has_pending_rx() {
                self.rxq.push(MuxerRx::ConnRx(key));
            }

            // If the connection wasn't previously scheduled for termination, add it to the
            // kill queue.
            if !was_expiring && conn.will_expire() {
                // It's safe to unwrap here, since `conn.will_expire()` already guaranteed that
                // an `conn.expiry` is available.
                self.killq.push(key, conn.expiry().unwrap());
            }

            let fd = conn.as_raw_fd();
            let new_evset = conn.get_polled_evset();
            if new_evset.is_empty() {
                // If the connection no longer needs epoll notifications, remove its listener
                // from our list.
                self.remove_listener(fd);
                return;
            }
            if let Some(EpollListener::Connection { evset, .. }) = self.listener_map.get_mut(&fd) {
                if *evset != new_evset {
                    // If the set of events that the connection is interested in has changed,
                    // we need to update its epoll listener.
                    debug!(
                        "vsock: updating listener for (lp={}, pp={}): old={:?}, new={:?}",
                        key.local_port, key.peer_port, *evset, new_evset
                    );

                    *evset = new_evset;
                    self.epoll
                        .ctl(
                            ControlOperation::Modify,
                            fd,
                            EpollEvent::new(new_evset, fd as u64),
                        )
                        .unwrap_or_else(|err| {
                            // This really shouldn't happen, like, ever. However, "famous last
                            // words" and all that, so let's just kill it with fire, and walk away.
                            self.kill_connection(key);
                            error!(
                                "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
                                key.local_port, key.peer_port, err
                            );
                            METRICS.vsock.muxer_event_fails.inc();
                        });
                }
            } else {
                // The connection had previously asked to be removed from the listener map (by
                // returning an empty event set via `get_polled_fd()`), but now wants back in.
                self.add_listener(
                    fd,
                    EpollListener::Connection {
                        key,
                        evset: new_evset,
                    },
                )
                .unwrap_or_else(|err| {
                    self.kill_connection(key);
                    error!(
                        "vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
                        key.local_port, key.peer_port, err
                    );
                    METRICS.vsock.muxer_event_fails.inc();
                });
            }
        }
    }