fn read_from_server()

in sources/api/apiclient/src/exec.rs [235:313]


    fn read_from_server(
        read: impl TryStream<Ok = Message, Error = WsError> + 'static,
        heartbeat_setter: Arc<Mutex<Instant>>,
        ret_tx: mpsc::UnboundedSender<CloseFrame<'static>>,
        capacity: Arc<AtomicCapacity>,
    ) -> Pin<Box<dyn Future<Output = Result<()>>>> {
        // Turn tungstenite errors into our own error type.
        read.err_into::<error::Error>()
            // Process each message from the server, stopping on Close or error.
            .try_for_each(move |ws_msg| {
                // For ownership reasons, make copies of the atomic handles that can be passed into
                // the async closure.
                let heartbeat_setter = heartbeat_setter.clone();
                let capacity = capacity.clone();
                let ret_tx = ret_tx.clone();

                async move {
                    match ws_msg {
                        // Binary messages represent process output, not encoded in any way.  Write
                        // it to stdout.
                        Message::Binary(data) => {
                            trace!("Received {} bytes of output from server", data.len());
                            let mut stdout = tokio::io::stdout();
                            stdout.write_all(&data).await.context(error::WriteOutput)?;
                            // May not be a full line of output, so flush any bytes we got.  Failure here
                            // isn't worthy of stopping the whole process.
                            let _ = stdout.flush().await;
                        }
                        // tokio-tungstenite replies to ping with pong; we just update our heartbeat.
                        Message::Ping(_) | Message::Pong(_) => {
                            // If we fail to get the mutex, the heartbeat thread has panicked, which means
                            // we'll no longer send pings to the server, and it'll disconnect us at some
                            // point.  Might as well try to finish our processing in the meantime.
                            if let Ok(mut hb) = heartbeat_setter.lock() {
                                trace!("Got ping/pong from server, updating heartbeat");
                                *hb = Instant::now();
                            }
                        }
                        // The server requested we close the connection, so we stop processing.
                        // Usually it includes the return code of the requested process.
                        Message::Close(c) => {
                            if let Some(ret) = c {
                                // If we fail to send the return code, there's nothing we can do to rectify
                                // the situation, and this is a Close so we definitely want to return below
                                // anyway.
                                let _ = ret_tx.unbounded_send(ret);
                            }
                            return error::Close.fail();
                        }
                        // Text messages represent encoded control messages from the server.
                        Message::Text(raw_msg) => {
                            let server_message =
                                serde_json::from_str(&raw_msg).context(error::Deserialize)?;
                            match server_message {
                                // Capacity messages tell us how many messages the server is
                                // willing to receive before it rejects us.
                                ServerMessage::Capacity(new) => {
                                    debug!(
                                        "Received capacity update from server: {} max outstanding, {} written",
                                        new.max_messages_outstanding,
                                        new.messages_written
                                    );
                                    capacity
                                        .max_messages_outstanding
                                        .store(new.max_messages_outstanding, Ordering::SeqCst);
                                    capacity
                                        .messages_written
                                        .store(new.messages_written, Ordering::SeqCst);
                                }
                            }
                        }
                    }
                    Ok(())
                }
            })
            // This puts the future in a Pin<Box>; we use Box so we don't have to name the exact
            // future type, and Pin is required for tokio to select! it.
            .boxed_local()
    }