src/results.rs (401 lines of code) (raw):

use crate::executors::ExecutorConfig; use crate::requests::TextGenerationAggregatedResponse; use crate::results::BenchmarkErrors::NoResponses; use crate::scheduler::ExecutorType; use chrono::Utc; use std::fmt::{Debug, Display, Formatter}; use std::time::Duration; #[derive(Debug)] pub(crate) enum BenchmarkErrors { NoResponses, } impl Display for BenchmarkErrors { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { NoResponses => write!(f, "Backend did not return any valid response. It is either not responding or test duration is too short."), } } } #[derive(Clone)] pub struct BenchmarkResults { pub id: String, aggregated_responses: Vec<TextGenerationAggregatedResponse>, executor_type: ExecutorType, executor_config: ExecutorConfig, } impl BenchmarkResults { pub fn new( id: String, executor_type: ExecutorType, executor_config: ExecutorConfig, ) -> BenchmarkResults { BenchmarkResults { id, aggregated_responses: Vec::new(), executor_type, executor_config, } } pub fn add_response(&mut self, response: TextGenerationAggregatedResponse) { self.aggregated_responses.push(response); } pub fn total_requests(&self) -> usize { self.aggregated_responses.len() } pub fn start_time(&self) -> Option<tokio::time::Instant> { self.aggregated_responses .first() .and_then(|response| response.start_time) } pub fn end_time(&self) -> Option<tokio::time::Instant> { self.aggregated_responses .last() .and_then(|response| response.end_time) } fn is_ready(&self) -> bool { self.start_time().is_some() && self.end_time().is_some() } pub fn failed_requests(&self) -> usize { self.aggregated_responses .iter() .filter(|response| response.failed) .count() } pub fn successful_requests(&self) -> usize { self.aggregated_responses .iter() .filter(|response| !response.failed) .count() } pub fn token_throughput_secs(&self) -> anyhow::Result<f64> { if self.is_ready() { let total_tokens: u64 = self.total_tokens(); Ok(total_tokens as f64 / self.duration().unwrap_or_default().as_secs_f64()) } else { Err(anyhow::anyhow!(NoResponses)) } } pub fn total_tokens_sent(&self) -> u64 { self.get_successful_responses() .iter() .map(|response| response.request.clone().unwrap().num_prompt_tokens) .sum() } pub fn total_prompt_tokens(&self) -> u64 { self.get_successful_responses() .iter() .map(|response| response.request.clone().unwrap().num_prompt_tokens) .sum() } pub fn prompt_tokens_avg(&self) -> anyhow::Result<f64> { if self.is_ready() { let total_prompt_tokens = self.total_prompt_tokens(); Ok(total_prompt_tokens as f64 / self.successful_requests() as f64) } else { Err(anyhow::anyhow!(NoResponses)) } } pub fn successful_request_rate(&self) -> anyhow::Result<f64> { if self.is_ready() { let total_requests = self.successful_requests(); Ok(total_requests as f64 / self.duration().unwrap_or_default().as_secs_f64()) } else { Err(anyhow::anyhow!(NoResponses)) } } pub fn total_tokens(&self) -> u64 { self.get_successful_responses() .iter() .map(|response| response.num_generated_tokens) .sum() } pub fn duration(&self) -> anyhow::Result<std::time::Duration> { if self.is_ready() { Ok(self .end_time() .unwrap() .duration_since(self.start_time().unwrap())) } else { Err(anyhow::anyhow!(NoResponses)) } } pub fn e2e_latency_avg(&self) -> anyhow::Result<std::time::Duration> { if self.is_ready() { if self.successful_requests() == 0 { return Ok(Duration::from_secs(0)); } Ok(self .get_successful_responses() .iter() .map(|response| response.e2e_latency().unwrap_or_default()) .sum::<Duration>() / self.successful_requests() as u32) } else { Err(anyhow::anyhow!(NoResponses)) } } pub fn e2e_latency_percentile(&self, percentile: f64) -> anyhow::Result<std::time::Duration> { let quantile = self.quantile_duration( self.get_successful_responses() .iter() .map(|response| response.e2e_latency().unwrap_or_default()) .collect(), percentile, )?; Ok(Duration::from_secs_f64(quantile)) } pub fn time_to_first_token_avg(&self) -> anyhow::Result<std::time::Duration> { if self.is_ready() { if self.successful_requests() == 0 { return Ok(Duration::from_secs(0)); } Ok(self .get_successful_responses() .iter() .map(|response| response.time_to_first_token().unwrap_or_default()) .sum::<Duration>() / self.successful_requests() as u32) } else { Err(anyhow::anyhow!(NoResponses)) } } pub fn time_to_first_token_percentile(&self, percentile: f64) -> anyhow::Result<Duration> { let quantile = self.quantile_duration( self.get_successful_responses() .iter() .map(|response| response.time_to_first_token().unwrap_or_default()) .collect(), percentile, )?; Ok(Duration::from_secs_f64(quantile)) } pub fn inter_token_latency_avg(&self) -> anyhow::Result<std::time::Duration> { if self.is_ready() { if self.successful_requests() == 0 { return Ok(Duration::from_secs(0)); } Ok(self .get_successful_responses() .iter() .map(|response| response.inter_token_latency().unwrap_or_default()) .sum::<Duration>() / self.successful_requests() as u32) } else { Err(anyhow::anyhow!(NoResponses)) } } pub fn inter_token_latency_percentile(&self, percentile: f64) -> anyhow::Result<Duration> { let quantile = self.quantile_duration( self.get_successful_responses() .iter() .map(|response| response.inter_token_latency().unwrap_or_default()) .collect(), percentile, )?; Ok(Duration::from_secs_f64(quantile)) } pub fn executor_type(&self) -> ExecutorType { self.executor_type.clone() } pub fn executor_config(&self) -> ExecutorConfig { self.executor_config.clone() } fn get_successful_responses(&self) -> Vec<&TextGenerationAggregatedResponse> { self.aggregated_responses .iter() .filter(|response| !response.failed) .collect() } pub fn get_responses(&self) -> Vec<TextGenerationAggregatedResponse> { self.aggregated_responses.clone() } /// Calculate the quantile of a given data set using interpolation method /// Results are similar to `numpy.percentile` fn quantile_duration(&self, mut data: Vec<Duration>, quantile: f64) -> anyhow::Result<f64> { if self.is_ready() { data.sort(); let i = (quantile * (data.len() - 1) as f64).floor(); let delta = (data.len() - 1) as f64 * quantile - i; if i as usize >= data.len() { return Err(anyhow::anyhow!(NoResponses)); } let quantile = (1. - delta) * data[i as usize].as_secs_f64() + delta * data[i as usize + 1].as_secs_f64(); Ok(quantile) } else { Err(anyhow::anyhow!(NoResponses)) } } } impl Debug for BenchmarkResults { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("BenchmarkResult") .field("id", &self.id) .field("executor_type", &self.executor_type.to_string()) .field("total_requests", &self.total_requests()) .field("start_time", &self.start_time()) .field("end_time", &self.end_time()) .field("total_tokens", &self.total_tokens()) .field( "token_throughput_secs", &self .token_throughput_secs() .or::<anyhow::Result<f64>>(Ok(-1.0)), ) .field( "duration_ms", &self .duration() .or::<anyhow::Result<Duration>>(Ok(Duration::from_secs(0))), ) .field( "average_time_to_first_token", &self .time_to_first_token_avg() .or::<anyhow::Result<Duration>>(Ok(Duration::from_secs(0))), ) .field( "average_inter_token_latency", &self .inter_token_latency_avg() .or::<anyhow::Result<Duration>>(Ok(Duration::from_secs(0))), ) .field("failed_requests", &self.failed_requests()) .field("successful_requests", &self.successful_requests()) .field( "request_rate", &self .successful_request_rate() .or::<anyhow::Result<f64>>(Ok(-1.0)), ) .field("sent_prompt_tokens", &self.total_tokens_sent()) .field( "e2e_latency_avg", &self .e2e_latency_avg() .or::<anyhow::Result<Duration>>(Ok(Duration::from_secs(0))), ) .finish() } } #[derive(Debug, Clone)] pub struct BenchmarkReport { results: Vec<BenchmarkResults>, start_time: Option<chrono::DateTime<Utc>>, end_time: Option<chrono::DateTime<Utc>>, } impl BenchmarkReport { pub fn new() -> BenchmarkReport { BenchmarkReport { results: Vec::new(), start_time: None, end_time: None, } } pub fn start(&mut self) { self.start_time = Some(Utc::now()); } pub fn end(&mut self) { self.end_time = Some(Utc::now()); } pub fn add_benchmark_result(&mut self, result: BenchmarkResults) { self.results.push(result); } pub fn get_results(&self) -> Vec<BenchmarkResults> { self.results.clone() } pub fn start_time(&self) -> Option<chrono::DateTime<Utc>> { self.start_time } pub fn end_time(&self) -> Option<chrono::DateTime<Utc>> { self.end_time } } #[cfg(test)] mod test { use super::*; use crate::requests::TextGenerationRequest; use std::sync::Arc; #[test] fn test_time_to_first_token_percentile() { let request = Arc::from(TextGenerationRequest { id: None, prompt: "test".to_string(), num_prompt_tokens: 10, num_decode_tokens: None, }); let mut response1 = TextGenerationAggregatedResponse::new(request.clone()); response1.start_time = Some(tokio::time::Instant::now()); response1.end_time = Some(tokio::time::Instant::now() + tokio::time::Duration::from_millis(100)); response1.num_generated_tokens = 100; response1.failed = false; response1.times_to_tokens = vec![ Duration::from_millis(100), Duration::from_millis(200), Duration::from_millis(300), Duration::from_millis(400), Duration::from_millis(500), ]; let mut response2 = TextGenerationAggregatedResponse::new(request.clone()); response2.start_time = Some(tokio::time::Instant::now()); response2.end_time = Some(tokio::time::Instant::now() + tokio::time::Duration::from_millis(200)); response2.num_generated_tokens = 100; response2.failed = false; response2.times_to_tokens = vec![ Duration::from_millis(600), Duration::from_millis(700), Duration::from_millis(800), Duration::from_millis(900), Duration::from_millis(1000), ]; let mut response3 = TextGenerationAggregatedResponse::new(request.clone()); response3.start_time = Some(tokio::time::Instant::now()); response3.end_time = Some(tokio::time::Instant::now() + tokio::time::Duration::from_millis(300)); response3.num_generated_tokens = 100; response3.failed = false; response3.times_to_tokens = vec![ Duration::from_millis(1100), Duration::from_millis(1200), Duration::from_millis(1300), Duration::from_millis(1400), Duration::from_millis(1500), ]; let mut response4 = TextGenerationAggregatedResponse::new(request.clone()); response4.start_time = Some(tokio::time::Instant::now()); response4.end_time = Some(tokio::time::Instant::now() + tokio::time::Duration::from_millis(300)); response4.num_generated_tokens = 100; response4.failed = false; response4.times_to_tokens = vec![ Duration::from_millis(1600), Duration::from_millis(1700), Duration::from_millis(1800), Duration::from_millis(1900), Duration::from_millis(2000), ]; let mut results = BenchmarkResults::new( "test".to_string(), ExecutorType::ConstantArrivalRate, ExecutorConfig { max_vus: 0, duration: Default::default(), rate: None, }, ); results.add_response(response1); results.add_response(response2); results.add_response(response3); results.add_response(response4); assert_eq!( results.time_to_first_token_percentile(0.9).unwrap(), Duration::from_millis(1450) ); assert_eq!( results.time_to_first_token_percentile(0.5).unwrap(), Duration::from_millis(850) ); } }