fn handle()

in sources/api/apiserver/src/server/exec.rs [145:292]


    fn handle(&mut self, msg: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            // Respond to Ping with Pong so the client knows we're alive, and record that we've
            // heard from them.
            Ok(Message::Ping(msg)) => {
                debug!("Received ping, updating heartbeat and responding");
                self.heartbeat = Instant::now();
                ctx.pong(&msg);
            }

            // When the client responds to our Ping with a Pong, record that we've heard from them.
            Ok(Message::Pong(_)) => {
                debug!("Received pong, updating heartbeat");
                self.heartbeat = Instant::now();
            }

            // Binary means process input, which we write directly to the child process.
            Ok(Message::Binary(data)) => {
                trace!("Received {} bytes of input from client", data.len());

                // Confirm we have a child, i.e. the client didn't send messages out of order.
                let child_handles = some_or_stop!(
                    &self.child_handles,
                    ctx,
                    Some("process data sent before initialization"),
                    ws::CloseCode::Policy,
                );

                // Confirm that we still have a channel open to write to the child process.  We
                // drop this when the client sends a ContentComplete; they shouldn't send anything
                // after that, but if they do, we can just ignore it.
                if let Some(write_tx) = &child_handles.write_tx {
                    // This is where we check that the client is actually obeying the capacity
                    // updates we're sending them.  The write_tx channel is bounded, and if we fail
                    // to write to it because it's full, we can righteously yell at the client.
                    match write_tx.try_send(data) {
                        // Sent the write request OK.
                        Ok(_unit) => {}

                        // Disconnect the client if they ignore our capacity.  We can't just wait
                        // for capacity because either (1) we'd use unlimited memory, or (2) we'd
                        // block the whole actor, meaning nothing gets done; heartbeats would fail,
                        // output wouldn't get sent, etc.
                        Err(TrySendError::Full(_data)) => {
                            info!("Client not obeying capacity updates, closing connection");
                            let msg = "write buffer full; obey capacity updates".to_string();
                            ctx.close(Some(ws::CloseReason {
                                code: ws::CloseCode::Size,
                                description: Some(msg),
                            }));
                            // Note: we don't ctx.stop() here because the close message wouldn't
                            // get sent; the actor message load from the incoming data delays
                            // sending the close, but stop() acts immediately.  This means we'll
                            // continue to receive client messages until it receives our stop.  Any
                            // more process data will likely hit this spot again and be dropped.
                        }

                        // Disconnected means the write channel is closed, meaning a write to the
                        // child process failed and we can no longer write to it safely; tell the
                        // client to stop.
                        Err(TrySendError::Disconnected(_data)) => {
                            stop(ctx, Some("writing to process failed"), ws::CloseCode::Error);
                        }
                    }
                }
            }

            // A Text message is a multiplexed control message giving us some control information
            // from the client.  We deserialize it to figure out what they want.
            Ok(Message::Text(msg)) => {
                let msg = ok_or_stop!(
                    serde_json::from_str(&msg),
                    ctx,
                    "invalid JSON in client message",
                    ws::CloseCode::Invalid
                );
                match msg {
                    // Initialize should be the first message the client sends, and it tells us
                    // what process they want to run and how.  (It'd be nice to include in the HTTP
                    // request body so we don't worry as much about ordering, but not all clients
                    // support that.)
                    ClientMessage::Initialize(init) => {
                        debug!("Client initialized for target container '{}' and command {:?} with tty: {}",
                               init.target,
                               init.command,
                               init.tty.is_some());
                        // Spawn the process, getting back handles that let us interact with it.
                        let child_handles = ok_or_stop!(
                            ChildHandles::new(init, &self.exec_socket_path, ctx.address()),
                            ctx,
                            "failed to spawn process",
                            ws::CloseCode::Error
                        );
                        self.child_handles = Some(child_handles);
                    }

                    // This means the client is done reading input from the user and we can close
                    // the write channel to the process, closing its stdin.
                    ClientMessage::ContentComplete => {
                        debug!("Received client content complete, dropping write handle");
                        // Confirm we have a child, i.e. the client didn't send messages out of
                        // order.
                        let child_handles = some_or_stop!(
                            &mut self.child_handles,
                            ctx,
                            Some("ContentComplete sent before initialization"),
                            ws::CloseCode::Policy
                        );
                        drop(child_handles.write_tx.take());
                    }

                    // This means the client changed window size, so we should relay that to the
                    // child process so it can update its output as needed.
                    ClientMessage::Winch(size) => {
                        // Unlikely to get here without a child being spawned yet, but if so,
                        // initialization should include window size so this isn't needed.
                        if let Some(child_handles) = self.child_handles.as_mut() {
                            child_handles.set_winsize(size);
                        } else {
                            debug!("Received client winch before child was spawned");
                        }
                    }
                }
            }

            // This means the client is done with us; stop the actor.
            Ok(Message::Close(reason)) => {
                info!("Client closed exec connection with reason: {:?}", reason);
                ctx.close(reason);
                ctx.stop();
            }

            // We don't use Continuation frames; it's easier to deal with individual Text/Binary
            // messages.
            Ok(Message::Continuation(_)) => {
                let msg = "Continuation messages not supported";
                stop(ctx, Some(msg), ws::CloseCode::Unsupported);
            }

            // no-op
            Ok(Message::Nop) => {}

            Err(e) => {
                error!("Stopping after receiving error message: {}", e);
                ctx.stop();
            }
        }
    }