in sources/api/apiclient/src/exec.rs [235:313]
fn read_from_server(
read: impl TryStream<Ok = Message, Error = WsError> + 'static,
heartbeat_setter: Arc<Mutex<Instant>>,
ret_tx: mpsc::UnboundedSender<CloseFrame<'static>>,
capacity: Arc<AtomicCapacity>,
) -> Pin<Box<dyn Future<Output = Result<()>>>> {
// Turn tungstenite errors into our own error type.
read.err_into::<error::Error>()
// Process each message from the server, stopping on Close or error.
.try_for_each(move |ws_msg| {
// For ownership reasons, make copies of the atomic handles that can be passed into
// the async closure.
let heartbeat_setter = heartbeat_setter.clone();
let capacity = capacity.clone();
let ret_tx = ret_tx.clone();
async move {
match ws_msg {
// Binary messages represent process output, not encoded in any way. Write
// it to stdout.
Message::Binary(data) => {
trace!("Received {} bytes of output from server", data.len());
let mut stdout = tokio::io::stdout();
stdout.write_all(&data).await.context(error::WriteOutput)?;
// May not be a full line of output, so flush any bytes we got. Failure here
// isn't worthy of stopping the whole process.
let _ = stdout.flush().await;
}
// tokio-tungstenite replies to ping with pong; we just update our heartbeat.
Message::Ping(_) | Message::Pong(_) => {
// If we fail to get the mutex, the heartbeat thread has panicked, which means
// we'll no longer send pings to the server, and it'll disconnect us at some
// point. Might as well try to finish our processing in the meantime.
if let Ok(mut hb) = heartbeat_setter.lock() {
trace!("Got ping/pong from server, updating heartbeat");
*hb = Instant::now();
}
}
// The server requested we close the connection, so we stop processing.
// Usually it includes the return code of the requested process.
Message::Close(c) => {
if let Some(ret) = c {
// If we fail to send the return code, there's nothing we can do to rectify
// the situation, and this is a Close so we definitely want to return below
// anyway.
let _ = ret_tx.unbounded_send(ret);
}
return error::Close.fail();
}
// Text messages represent encoded control messages from the server.
Message::Text(raw_msg) => {
let server_message =
serde_json::from_str(&raw_msg).context(error::Deserialize)?;
match server_message {
// Capacity messages tell us how many messages the server is
// willing to receive before it rejects us.
ServerMessage::Capacity(new) => {
debug!(
"Received capacity update from server: {} max outstanding, {} written",
new.max_messages_outstanding,
new.messages_written
);
capacity
.max_messages_outstanding
.store(new.max_messages_outstanding, Ordering::SeqCst);
capacity
.messages_written
.store(new.messages_written, Ordering::SeqCst);
}
}
}
}
Ok(())
}
})
// This puts the future in a Pin<Box>; we use Box so we don't have to name the exact
// future type, and Pin is required for tokio to select! it.
.boxed_local()
}