src/app.rs (454 lines of code) (raw):
use crate::benchmark::Event as BenchmarkEvent;
use crate::event::{terminal_event_task, AppEvent};
use crate::flux::{Action, AppState, Dispatcher, Store};
use crate::scheduler::ExecutorType;
use crate::BenchmarkConfig;
use crossterm::event::{KeyCode, KeyEvent, KeyModifiers};
use ratatui::layout::{Constraint, Direction, Layout};
use ratatui::text::Span;
use ratatui::widgets::ListDirection::BottomToTop;
use ratatui::widgets::{Cell, Dataset, List, ListItem, Row, Table};
use ratatui::{
buffer::Buffer,
layout::{Alignment, Rect},
style::Stylize as OtherStylize,
symbols,
symbols::border,
text::{Line, Text},
widgets::{Block, Paragraph, Widget},
DefaultTerminal, Frame,
};
use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
use tokio::sync::{broadcast, mpsc};
pub struct App {
exit: bool,
store: Arc<Mutex<Store>>,
dispatcher: Arc<Mutex<Dispatcher>>,
receiver: Receiver<AppEvent>,
benchmark_config: BenchmarkConfig,
stop_sender: broadcast::Sender<()>,
}
pub async fn run_console(
benchmark_config: BenchmarkConfig,
mut receiver: UnboundedReceiver<BenchmarkEvent>,
stop_sender: broadcast::Sender<()>,
) {
let (app_tx, app_rx) = mpsc::channel(8);
// Create event task
let stop_receiver_signal = stop_sender.subscribe();
tokio::spawn(terminal_event_task(250, app_tx, stop_receiver_signal));
let mut app = App::new(benchmark_config, app_rx, stop_sender.clone());
app.dispatcher
.lock()
.expect("lock")
.dispatch(Action::LogMessage(LogMessageUI {
message: "Starting benchmark".to_string(),
level: LogLevel::Info,
timestamp: chrono::Utc::now(),
}));
let dispatcher = app.dispatcher.clone();
let mut stop_receiver_signal = stop_sender.subscribe();
let event_thread = tokio::spawn(async move {
tokio::select! {
_=async{
while let Some(event) = receiver.recv().await {
match event {
BenchmarkEvent::BenchmarkStart(event) => {
dispatcher.lock().expect("lock").dispatch(Action::AddBenchmark(BenchmarkUI {
id: event.id,
status: BenchmarkStatus::Running,
progress: 0.0,
throughput: "0".to_string(),
successful_requests: 0,
failed_requests: 0,
}));
}
BenchmarkEvent::BenchmarkProgress(event) => {
let (successful_requests,failed_requests) = (event.successful_requests,event.failed_requests);
dispatcher.lock().expect("lock").dispatch(Action::AddBenchmark(BenchmarkUI {
id: event.id,
status: BenchmarkStatus::Running,
progress: event.progress,
throughput: event.request_throughput.map_or("0".to_string(), |e| format!("{e:.2}")),
successful_requests,
failed_requests,
}));
}
BenchmarkEvent::BenchmarkEnd(event) => {
dispatcher.lock().expect("lock").dispatch(Action::LogMessage(LogMessageUI {
message: format!("Benchmark {} ended", event.id),
level: LogLevel::Info,
timestamp: chrono::Utc::now(),
}));
if let Some(results) = event.results {
let (successful_requests,failed_requests) = (results.successful_requests() as u64,results.failed_requests() as u64);
dispatcher.lock().expect("lock").dispatch(Action::AddBenchmark(BenchmarkUI {
id: event.id,
status: BenchmarkStatus::Completed,
progress: 100.0,
throughput: event.request_throughput.map_or("0".to_string(), |e| format!("{e:.2}")),
successful_requests,
failed_requests,
}));
dispatcher.lock().expect("lock").dispatch(Action::AddBenchmarkResults(results));
}
}
BenchmarkEvent::Message(event) => {
dispatcher.lock().expect("lock").dispatch(Action::LogMessage(LogMessageUI {
message: event.message,
level: LogLevel::Info,
timestamp: event.timestamp,
}));
}
BenchmarkEvent::BenchmarkReportEnd(path) => {
dispatcher.lock().expect("lock").dispatch(Action::LogMessage(LogMessageUI {
message: format!("Benchmark report saved to {}", path),
level: LogLevel::Info,
timestamp: chrono::Utc::now(),
}));
break;
}
BenchmarkEvent::BenchmarkError(event) => {
dispatcher.lock().expect("lock").dispatch(Action::LogMessage(LogMessageUI {
message: format!("Error running benchmark: {:?}", event),
level: LogLevel::Error,
timestamp: chrono::Utc::now(),
}));
break;
}
}
}
}=>{}
_ = stop_receiver_signal.recv() => {}
}
});
let mut stop_receiver_signal = stop_sender.subscribe();
let app_thread = tokio::spawn(async move {
tokio::select! {
_ = async {
let _ = app.run(&mut ratatui::init()).await;
ratatui::restore();
}=>{}
_ = stop_receiver_signal.recv() => {}
}
});
let _ = event_thread.await;
let _ = app_thread.await;
}
impl App {
pub fn new(
benchmark_config: BenchmarkConfig,
receiver: Receiver<AppEvent>,
stop_sender: Sender<()>,
) -> App {
let store = Arc::from(Mutex::new(Store::new()));
let dispatcher = Arc::from(Mutex::new(Dispatcher::new(store.clone())));
App {
exit: false,
store: store.clone(),
dispatcher: dispatcher.clone(),
receiver,
benchmark_config,
stop_sender,
}
}
pub async fn run(&mut self, terminal: &mut DefaultTerminal) -> io::Result<()> {
while !self.exit {
terminal.draw(|frame| self.draw(frame))?;
self.handle_events().await?;
}
// signal everybody to stop
let _ = self.stop_sender.send(());
Ok(())
}
fn draw(&self, frame: &mut Frame) {
frame.render_widget(self, frame.area())
}
async fn handle_events(&mut self) -> io::Result<()> {
match self.receiver.recv().await {
None => Err(io::Error::new(io::ErrorKind::Other, "No event")),
Some(event) => match event {
AppEvent::Tick => Ok(()),
AppEvent::Key(key_event) => self.handle_key_event(key_event),
AppEvent::Resize => Ok(()),
},
}
}
fn handle_key_event(&mut self, key_event: KeyEvent) -> io::Result<()> {
match key_event {
KeyEvent {
code: KeyCode::Char('q'),
..
} => self.exit(),
KeyEvent {
code: KeyCode::Char('c'),
modifiers: KeyModifiers::CONTROL,
..
} => self.exit(),
_ => {}
}
Ok(())
}
fn exit(&mut self) {
self.exit = true;
}
fn create_datasets(&self, state: AppState) -> HashMap<String, Vec<(f64, f64)>> {
let token_throughput_rate = state
.results
.iter()
.filter_map(|r| match r.executor_type() {
ExecutorType::ConstantArrivalRate => {
let throughput = r.token_throughput_secs().unwrap_or(0.0);
Some((r.executor_config().rate.unwrap(), throughput))
}
ExecutorType::ConstantVUs => None,
})
.collect::<Vec<_>>();
let token_throughput_vus = state
.results
.iter()
.filter_map(|r| match r.executor_type() {
ExecutorType::ConstantVUs => {
let throughput = r.token_throughput_secs().unwrap_or(0.0);
Some((r.executor_config().max_vus as f64, throughput))
}
ExecutorType::ConstantArrivalRate => None,
})
.collect::<Vec<_>>();
let inter_token_latency_rate = state
.results
.iter()
.filter_map(|r| match r.executor_type() {
ExecutorType::ConstantArrivalRate => {
let latency = r
.inter_token_latency_avg()
.unwrap_or_default()
.as_secs_f64();
Some((r.executor_config().rate.unwrap(), latency))
}
ExecutorType::ConstantVUs => None,
})
.collect::<Vec<_>>();
let inter_token_latency_vus = state
.results
.iter()
.filter_map(|r| match r.executor_type() {
ExecutorType::ConstantVUs => {
let latency = r
.inter_token_latency_avg()
.unwrap_or_default()
.as_secs_f64();
Some((r.executor_config().max_vus as f64, latency))
}
ExecutorType::ConstantArrivalRate => None,
})
.collect::<Vec<_>>();
HashMap::from([
("token_throughput_rate".to_string(), token_throughput_rate),
("token_throughput_vus".to_string(), token_throughput_vus),
(
"inter_token_latency_rate".to_string(),
inter_token_latency_rate,
),
(
"inter_token_latency_vus".to_string(),
inter_token_latency_vus,
),
])
}
}
impl Widget for &App {
fn render(self, area: Rect, buf: &mut Buffer) {
let state = self.store.lock().unwrap().get_state();
let data = self.create_datasets(state.clone());
let main_layout = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Length(1), Constraint::Min(20)])
.split(area);
let bottom_layout = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Percentage(50), Constraint::Percentage(50)])
.split(main_layout[1]);
let steps_graph_layout = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(35), Constraint::Percentage(65)])
.split(bottom_layout[0]);
// LOGS
let logs_title = Line::from("Logs".bold()).centered();
let logs_block = Block::bordered()
.title_top(logs_title)
.border_set(border::THICK);
List::new(
state
.messages
.iter()
.rev()
.map(|m| {
let level_span = match m.level {
LogLevel::Info => {
Span::raw(m.level.to_string().to_uppercase()).green().bold()
}
LogLevel::Warning => Span::raw(m.level.to_string().to_uppercase())
.yellow()
.bold(),
LogLevel::Error => {
Span::raw(m.level.to_string().to_uppercase()).red().bold()
}
};
let content = Line::from(vec![
m.formatted_timestamp().clone().gray(),
Span::raw(" "),
level_span,
Span::raw(" "),
Span::raw(m.message.to_string()).bold(),
]);
ListItem::new(content)
})
.collect::<Vec<_>>(),
)
.direction(BottomToTop)
.block(logs_block)
.render(bottom_layout[1], buf);
// BENCHMARK config
let rate_mode = match self.benchmark_config.rates {
None => "Automatic".to_string(),
Some(_) => "Manual".to_string(),
};
let config_text = Text::from(vec![Line::from(vec![
format!("Profile: {profile} | Benchmark: {kind} | Max VUs: {max_vus} | Duration: {duration} sec | Rates: {rates} | Warmup: {warmup} sec",
profile = self.benchmark_config.profile.clone().unwrap_or("N/A".to_string()),
kind = self.benchmark_config.benchmark_kind,
max_vus = self.benchmark_config.max_vus,
duration = self.benchmark_config.duration.as_secs_f64(),
rates = rate_mode,
warmup = self.benchmark_config.warmup_duration.as_secs_f64()).white().bold(),
])]);
Paragraph::new(config_text.clone()).render(main_layout[0], buf);
// STEPS
let steps_block_title = Line::from("Benchmark steps".bold()).centered();
let steps_block = Block::bordered()
.title(steps_block_title.alignment(Alignment::Center))
.border_set(border::THICK);
let step_rows = state
.benchmarks
.iter()
.map(|b| {
let error_rate = if b.failed_requests > 0 {
format!(
"{:4.0}%",
b.failed_requests as f64
/ (b.failed_requests + b.successful_requests) as f64
* 100.
)
.light_red()
.bold()
} else {
format!("{:4.0}%", 0).to_string().white()
};
let cells = vec![
b.id.clone().white(),
b.status.to_string().white(),
format!("{:4.0}%", b.progress).white(),
error_rate,
format!("{:>6.6} req/sec avg", b.throughput).green().bold(),
];
Row::new(cells)
})
.collect::<Vec<_>>();
let widths = [
Constraint::Length(30),
Constraint::Length(10),
Constraint::Length(5),
Constraint::Length(5),
Constraint::Length(20),
];
// steps table
Table::new(step_rows, widths)
.header(Row::new(vec![
Cell::new(Line::from("Bench").alignment(Alignment::Left)),
Cell::new(Line::from("Status").alignment(Alignment::Left)),
Cell::new(Line::from("%").alignment(Alignment::Left)),
Cell::new(Line::from("Err").alignment(Alignment::Left)),
Cell::new(Line::from("Throughput").alignment(Alignment::Left)),
]))
.block(steps_block)
.render(steps_graph_layout[0], buf);
// CHARTS
let graphs_block_title = Line::from("Token throughput rate".bold()).centered();
let graphs_block = Block::bordered()
.title(graphs_block_title.alignment(Alignment::Center))
.border_set(border::THICK);
let binding = data.get("token_throughput_rate").unwrap().clone();
let datasets = vec![Dataset::default()
.name("Token throughput rate".to_string())
.marker(symbols::Marker::Dot)
.graph_type(ratatui::widgets::GraphType::Scatter)
.style(ratatui::style::Style::default().fg(ratatui::style::Color::LightMagenta))
.data(&binding)];
let (xmax, ymax) = get_max_bounds(&binding, (10.0, 100.0));
let x_axis = ratatui::widgets::Axis::default()
.title("Arrival rate (req/s)".to_string())
.style(ratatui::style::Style::default().white())
.bounds([0.0, xmax])
.labels(get_axis_labels(0.0, xmax, 5));
let y_axis = ratatui::widgets::Axis::default()
.title("Throughput (tokens/s)".to_string())
.style(ratatui::style::Style::default().white())
.bounds([0.0, ymax])
.labels(get_axis_labels(0.0, ymax, 5));
ratatui::widgets::Chart::new(datasets)
.x_axis(x_axis)
.y_axis(y_axis)
.block(graphs_block)
.legend_position(None)
.render(steps_graph_layout[1], buf);
}
}
fn get_max_bounds(data: &[(f64, f64)], default_max: (f64, f64)) -> (f64, f64) {
let xmax = data
.iter()
.map(|(x, _)| x)
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap_or(&default_max.0);
let ymax = data
.iter()
.map(|(_, y)| y)
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap_or(&default_max.1);
(*xmax, *ymax)
}
fn get_axis_labels(min: f64, max: f64, num_labels: u32) -> Vec<String> {
let step = (max - min) / num_labels as f64;
(0..num_labels)
.map(|i| format!("{:.2}", min + i as f64 * step))
.collect()
}
#[allow(dead_code)]
#[derive(Clone, strum_macros::Display)]
enum LogLevel {
Info,
Warning,
Error,
}
#[derive(Clone)]
pub(crate) struct LogMessageUI {
message: String,
level: LogLevel,
timestamp: chrono::DateTime<chrono::Utc>,
}
impl LogMessageUI {
fn formatted_timestamp(&self) -> String {
self.timestamp.to_rfc3339()
}
}
#[derive(Clone)]
pub(crate) struct BenchmarkUI {
pub(crate) id: String,
status: BenchmarkStatus,
progress: f64,
throughput: String,
successful_requests: u64,
failed_requests: u64,
}
#[derive(Clone, strum_macros::Display)]
enum BenchmarkStatus {
Running,
Completed,
}