src/visualizer.rs (277 lines of code) (raw):
use crate::utils::DataMetrics;
use crate::{data::Data, data::ProcessedData, get_file, PDError};
use anyhow::Result;
use log::debug;
use rustix::fd::AsRawFd;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::{Path, PathBuf};
use std::{collections::HashMap, fs::File};
#[derive(Clone, Debug)]
pub struct ReportParams {
pub data_dir: PathBuf,
pub tmp_dir: PathBuf,
pub report_dir: PathBuf,
pub run_name: String,
pub data_file_path: PathBuf,
}
impl ReportParams {
fn new() -> Self {
ReportParams {
data_dir: PathBuf::new(),
tmp_dir: PathBuf::new(),
report_dir: PathBuf::new(),
run_name: String::new(),
data_file_path: PathBuf::new(),
}
}
}
pub struct DataVisualizer {
pub data: ProcessedData,
pub file_handle: Option<File>,
pub run_values: HashMap<String, Vec<ProcessedData>>,
pub file_name: String,
pub js_file_name: String,
pub js: String,
pub api_name: String,
pub has_custom_raw_data_parser: bool,
pub data_available: HashMap<String, bool>,
pub report_params: ReportParams,
}
impl DataVisualizer {
pub fn new(
data: ProcessedData,
file_name: String,
js_file_name: String,
js: String,
api_name: String,
) -> Self {
DataVisualizer {
data,
file_handle: None,
run_values: HashMap::new(),
file_name,
js_file_name,
js,
api_name,
has_custom_raw_data_parser: false,
data_available: HashMap::new(),
report_params: ReportParams::new(),
}
}
pub fn has_custom_raw_data_parser(&mut self) {
self.has_custom_raw_data_parser = true;
}
pub fn init_visualizer(
&mut self,
dir: String,
name: String,
tmp_dir: &Path,
fin_dir: &Path,
) -> Result<()> {
let file = get_file(dir.clone(), self.file_name.clone())?;
let full_path = Path::new("/proc/self/fd").join(file.as_raw_fd().to_string());
self.report_params.data_dir = PathBuf::from(dir.clone());
self.report_params.tmp_dir = tmp_dir.to_path_buf();
self.report_params.report_dir = fin_dir.to_path_buf();
self.report_params.run_name = name.clone();
self.report_params.data_file_path = fs::read_link(full_path).unwrap();
self.file_handle = Some(file);
self.run_values.insert(name.clone(), Vec::new());
self.data_available.insert(name, true);
Ok(())
}
pub fn data_not_available(&mut self, name: String) -> Result<()> {
self.data_available.insert(name, false);
Ok(())
}
pub fn process_raw_data(&mut self, name: String) -> Result<()> {
if !self.data_available.get(&name).unwrap() {
debug!("Raw data unavailable for: {}", self.api_name);
return Ok(());
}
debug!("Processing raw data for: {}", self.api_name);
if self.has_custom_raw_data_parser {
self.run_values.insert(
name.clone(),
self.data
.custom_raw_data_parser(self.report_params.clone())?,
);
return Ok(());
}
let mut raw_data = Vec::new();
loop {
match bincode::deserialize_from::<_, Data>(self.file_handle.as_ref().unwrap()) {
Ok(v) => raw_data.push(v),
Err(e) => match *e {
// EOF
bincode::ErrorKind::Io(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
break
}
e => panic!("Error when Deserializing {} data {}", self.api_name, e),
},
};
}
let mut data = Vec::new();
for value in raw_data {
let processed_data = self.data.process_raw_data(value)?;
data.push(processed_data);
}
self.run_values.insert(name.clone(), data);
Ok(())
}
pub fn get_data(
&mut self,
name: String,
query: String,
metrics: &mut DataMetrics,
) -> Result<String> {
if !self.data_available.get(&name).unwrap() {
debug!("No data available for: {} query: {}", self.api_name, query);
return Ok("No data collected".to_string());
}
/* Get run name from Query */
let param: Vec<(String, String)> = serde_urlencoded::from_str(&query)?;
let (_, run) = param[0].clone();
let values = self
.run_values
.get_mut(&run)
.ok_or(PDError::VisualizerRunValueGetError(run.to_string()))?;
if values.is_empty() {
return Ok("No data collected".to_string());
}
self.data.get_data(values.clone(), query, metrics)
}
pub fn get_calls(&mut self) -> Result<Vec<String>> {
self.data.get_calls()
}
}
pub enum GraphLimitType {
UInt64(u64),
F64(f64),
}
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
pub struct GraphLimits {
pub low: u64,
pub high: u64,
pub init_done: bool,
}
impl GraphLimits {
pub fn new() -> Self {
GraphLimits {
low: 0,
high: 0,
init_done: false,
}
}
}
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
pub struct GraphMetadata {
pub limits: GraphLimits,
}
impl GraphMetadata {
pub fn new() -> Self {
GraphMetadata {
limits: GraphLimits::new(),
}
}
fn update_limit_u64(&mut self, value: u64) {
if !self.limits.init_done {
self.limits.low = value;
self.limits.init_done = true;
}
if value < self.limits.low {
self.limits.low = value;
}
if value > self.limits.high {
self.limits.high = value;
}
}
fn update_limit_f64(&mut self, value: f64) {
let value_floor = value.floor() as u64;
let value_ceil = value.ceil() as u64;
if !self.limits.init_done {
self.limits.low = value_floor;
self.limits.init_done = true;
}
// Set low
if value_floor < self.limits.low {
self.limits.low = value_floor;
}
if value_ceil < self.limits.low {
self.limits.low = value_ceil;
}
// Set high
if value_floor > self.limits.high {
self.limits.high = value_floor;
}
if value_ceil > self.limits.high {
self.limits.high = value_ceil;
}
}
pub fn update_limits(&mut self, value: GraphLimitType) {
match value {
GraphLimitType::UInt64(v) => self.update_limit_u64(v),
GraphLimitType::F64(v) => self.update_limit_f64(v),
}
}
}
pub trait GetData {
fn get_calls(&mut self) -> Result<Vec<String>> {
unimplemented!();
}
fn get_data(
&mut self,
_values: Vec<ProcessedData>,
_query: String,
_metrics: &mut DataMetrics,
) -> Result<String> {
unimplemented!();
}
fn process_raw_data(&mut self, _buffer: Data) -> Result<ProcessedData> {
unimplemented!();
}
fn custom_raw_data_parser(&mut self, _params: ReportParams) -> Result<Vec<ProcessedData>> {
unimplemented!();
}
}
#[cfg(test)]
mod tests {
use super::DataVisualizer;
use crate::data::cpu_utilization::{CpuData, CpuUtilization};
use crate::data::{ProcessedData, TimeEnum};
use crate::utils::DataMetrics;
use std::path::PathBuf;
#[test]
fn test_unpack_data() {
let mut dv = DataVisualizer::new(
ProcessedData::CpuUtilization(CpuUtilization::new()),
"cpu_utilization".to_string(),
String::new(),
String::new(),
"cpu_utilization".to_string(),
);
dv.init_visualizer(
"tests/test-data/aperf_2023-07-26_18_37_43/".to_string(),
"test".to_string(),
&PathBuf::new(),
&PathBuf::new(),
)
.unwrap();
dv.process_raw_data("test".to_string()).unwrap();
let ret = dv
.get_data(
"test".to_string(),
"run=test&get=values&key=aggregate".to_string(),
&mut DataMetrics::new(String::new()),
)
.unwrap();
let values: Vec<CpuData> = serde_json::from_str(&ret).unwrap();
assert!(values[0].cpu == -1);
match values[0].time {
TimeEnum::TimeDiff(value) => assert!(value == 0),
_ => unreachable!(),
}
match values[1].time {
TimeEnum::TimeDiff(value) => assert!(value == 1),
_ => unreachable!(),
}
}
}