in sources/api/apiserver/src/server/exec.rs [145:292]
fn handle(&mut self, msg: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
// Respond to Ping with Pong so the client knows we're alive, and record that we've
// heard from them.
Ok(Message::Ping(msg)) => {
debug!("Received ping, updating heartbeat and responding");
self.heartbeat = Instant::now();
ctx.pong(&msg);
}
// When the client responds to our Ping with a Pong, record that we've heard from them.
Ok(Message::Pong(_)) => {
debug!("Received pong, updating heartbeat");
self.heartbeat = Instant::now();
}
// Binary means process input, which we write directly to the child process.
Ok(Message::Binary(data)) => {
trace!("Received {} bytes of input from client", data.len());
// Confirm we have a child, i.e. the client didn't send messages out of order.
let child_handles = some_or_stop!(
&self.child_handles,
ctx,
Some("process data sent before initialization"),
ws::CloseCode::Policy,
);
// Confirm that we still have a channel open to write to the child process. We
// drop this when the client sends a ContentComplete; they shouldn't send anything
// after that, but if they do, we can just ignore it.
if let Some(write_tx) = &child_handles.write_tx {
// This is where we check that the client is actually obeying the capacity
// updates we're sending them. The write_tx channel is bounded, and if we fail
// to write to it because it's full, we can righteously yell at the client.
match write_tx.try_send(data) {
// Sent the write request OK.
Ok(_unit) => {}
// Disconnect the client if they ignore our capacity. We can't just wait
// for capacity because either (1) we'd use unlimited memory, or (2) we'd
// block the whole actor, meaning nothing gets done; heartbeats would fail,
// output wouldn't get sent, etc.
Err(TrySendError::Full(_data)) => {
info!("Client not obeying capacity updates, closing connection");
let msg = "write buffer full; obey capacity updates".to_string();
ctx.close(Some(ws::CloseReason {
code: ws::CloseCode::Size,
description: Some(msg),
}));
// Note: we don't ctx.stop() here because the close message wouldn't
// get sent; the actor message load from the incoming data delays
// sending the close, but stop() acts immediately. This means we'll
// continue to receive client messages until it receives our stop. Any
// more process data will likely hit this spot again and be dropped.
}
// Disconnected means the write channel is closed, meaning a write to the
// child process failed and we can no longer write to it safely; tell the
// client to stop.
Err(TrySendError::Disconnected(_data)) => {
stop(ctx, Some("writing to process failed"), ws::CloseCode::Error);
}
}
}
}
// A Text message is a multiplexed control message giving us some control information
// from the client. We deserialize it to figure out what they want.
Ok(Message::Text(msg)) => {
let msg = ok_or_stop!(
serde_json::from_str(&msg),
ctx,
"invalid JSON in client message",
ws::CloseCode::Invalid
);
match msg {
// Initialize should be the first message the client sends, and it tells us
// what process they want to run and how. (It'd be nice to include in the HTTP
// request body so we don't worry as much about ordering, but not all clients
// support that.)
ClientMessage::Initialize(init) => {
debug!("Client initialized for target container '{}' and command {:?} with tty: {}",
init.target,
init.command,
init.tty.is_some());
// Spawn the process, getting back handles that let us interact with it.
let child_handles = ok_or_stop!(
ChildHandles::new(init, &self.exec_socket_path, ctx.address()),
ctx,
"failed to spawn process",
ws::CloseCode::Error
);
self.child_handles = Some(child_handles);
}
// This means the client is done reading input from the user and we can close
// the write channel to the process, closing its stdin.
ClientMessage::ContentComplete => {
debug!("Received client content complete, dropping write handle");
// Confirm we have a child, i.e. the client didn't send messages out of
// order.
let child_handles = some_or_stop!(
&mut self.child_handles,
ctx,
Some("ContentComplete sent before initialization"),
ws::CloseCode::Policy
);
drop(child_handles.write_tx.take());
}
// This means the client changed window size, so we should relay that to the
// child process so it can update its output as needed.
ClientMessage::Winch(size) => {
// Unlikely to get here without a child being spawned yet, but if so,
// initialization should include window size so this isn't needed.
if let Some(child_handles) = self.child_handles.as_mut() {
child_handles.set_winsize(size);
} else {
debug!("Received client winch before child was spawned");
}
}
}
}
// This means the client is done with us; stop the actor.
Ok(Message::Close(reason)) => {
info!("Client closed exec connection with reason: {:?}", reason);
ctx.close(reason);
ctx.stop();
}
// We don't use Continuation frames; it's easier to deal with individual Text/Binary
// messages.
Ok(Message::Continuation(_)) => {
let msg = "Continuation messages not supported";
stop(ctx, Some(msg), ws::CloseCode::Unsupported);
}
// no-op
Ok(Message::Nop) => {}
Err(e) => {
error!("Stopping after receiving error message: {}", e);
ctx.stop();
}
}
}