in src/devices/src/virtio/vsock/csm/connection.rs [154:274]
fn recv_pkt(&mut self, pkt: &mut VsockPacket, mem: &GuestMemoryMmap) -> VsockResult<()> {
// Perform some generic initialization that is the same for any packet operation (e.g.
// source, destination, credit, etc).
self.init_pkt(pkt);
METRICS.vsock.rx_packets_count.inc();
// If forceful termination is pending, there's no point in checking for anything else.
// It's dead, Jim.
if self.pending_rx.remove(PendingRx::Rst) {
pkt.set_op(uapi::VSOCK_OP_RST);
return Ok(());
}
// Next up: if we're due a connection confirmation, that's all we need to know to fill
// in this packet.
if self.pending_rx.remove(PendingRx::Response) {
self.state = ConnState::Established;
pkt.set_op(uapi::VSOCK_OP_RESPONSE);
return Ok(());
}
// Same thing goes for locally-initiated connections that need to yield a connection
// request.
if self.pending_rx.remove(PendingRx::Request) {
self.expiry =
Some(Instant::now() + Duration::from_millis(defs::CONN_REQUEST_TIMEOUT_MS));
pkt.set_op(uapi::VSOCK_OP_REQUEST);
return Ok(());
}
if self.pending_rx.remove(PendingRx::Rw) {
// We're due to produce a data packet, by reading the data from the host-side
// Unix socket.
match self.state {
// A data packet is only valid for established connections, and connections for
// which our peer has initiated a graceful shutdown, but can still receive data.
ConnState::Established | ConnState::PeerClosed(false, _) => (),
_ => {
// Any other connection state is invalid at this point, and we need to kill it
// with fire.
pkt.set_op(uapi::VSOCK_OP_RST);
return Ok(());
}
}
// Oh wait, before we start bringing in the big data, can our peer handle receiving so
// much bytey goodness?
if self.need_credit_update_from_peer() {
self.last_fwd_cnt_to_peer = self.fwd_cnt;
pkt.set_op(uapi::VSOCK_OP_CREDIT_REQUEST);
return Ok(());
}
// The maximum amount of data we can read in is limited by both the RX buffer size and
// the peer available buffer space.
let max_len = std::cmp::min(pkt.buf_size(), self.peer_avail_credit());
// Read data from the stream straight to the RX buffer, for maximum throughput.
match pkt.read_at_offset_from(mem, 0, &mut self.stream, max_len) {
Ok(read_cnt) => {
if read_cnt == 0 {
// A 0-length read means the host stream was closed down. In that case,
// we'll ask our peer to shut down the connection. We can neither send nor
// receive any more data.
self.state = ConnState::LocalClosed;
self.expiry = Some(
Instant::now() + Duration::from_millis(defs::CONN_SHUTDOWN_TIMEOUT_MS),
);
pkt.set_op(uapi::VSOCK_OP_SHUTDOWN)
.set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_RCV)
.set_flag(uapi::VSOCK_FLAGS_SHUTDOWN_SEND);
} else {
// On a successful data read, we fill in the packet with the RW op, and
// length of the read data.
pkt.set_op(uapi::VSOCK_OP_RW).set_len(read_cnt as u32);
METRICS.vsock.rx_bytes_count.add(read_cnt);
}
self.rx_cnt += Wrapping(pkt.len());
self.last_fwd_cnt_to_peer = self.fwd_cnt;
return Ok(());
}
Err(VsockError::GuestMemoryMmap(GuestMemoryError::IOError(err)))
if err.kind() == ErrorKind::WouldBlock =>
{
// This shouldn't actually happen (receiving EWOULDBLOCK after EPOLLIN), but
// apparently it does, so we need to handle it gracefully.
warn!(
"vsock: unexpected EWOULDBLOCK while reading from backing stream: \
lp={}, pp={}, err={:?}",
self.local_port, self.peer_port, err
);
}
Err(err) => {
// We are not expecting any other errors when reading from the underlying
// stream. If any show up, we'll immediately kill this connection.
METRICS.vsock.rx_read_fails.inc();
error!(
"vsock: error reading from backing stream: lp={}, pp={}, err={:?}",
self.local_port, self.peer_port, err
);
pkt.set_op(uapi::VSOCK_OP_RST);
self.last_fwd_cnt_to_peer = self.fwd_cnt;
return Ok(());
}
};
}
// A credit update is basically a no-op, so we should only waste a perfectly fine RX
// buffer on it if we really have nothing else to say, hence we check for this RX
// indication last.
if self.pending_rx.remove(PendingRx::CreditUpdate) && !self.has_pending_rx() {
pkt.set_op(uapi::VSOCK_OP_CREDIT_UPDATE);
self.last_fwd_cnt_to_peer = self.fwd_cnt;
return Ok(());
}
// We've already checked for all conditions that would have produced a packet, so
// if we got to here, we don't know how to yield one.
Err(VsockError::NoData)
}