runners/datafusion-rust/src/main.rs (236 lines of code) (raw):

use datafusion::common::Result; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::physical_plan::displayable; use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use datafusion::scalar::ScalarValue; use datafusion::DATAFUSION_VERSION; use serde::Serialize; use std::collections::HashMap; use std::fs; use std::fs::File; use std::io::{BufRead, BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use structopt::StructOpt; use tokio::time::Instant; #[derive(StructOpt, Debug)] #[structopt(name = "basic")] struct Opt { /// Activate debug mode #[structopt(long)] debug: bool, /// Optional path to config file #[structopt(long, parse(from_os_str))] config_path: Option<PathBuf>, /// Path to queries #[structopt(long, parse(from_os_str))] query_path: PathBuf, /// Path to data #[structopt(short, long, parse(from_os_str))] data_path: PathBuf, /// Output path #[structopt(short, long, parse(from_os_str))] output: PathBuf, /// Query number. If no query number specified then all queries will be executed. #[structopt(short, long)] query: Option<u8>, /// Number of queries in this benchmark suite #[structopt(short, long)] num_queries: Option<u8>, /// List of queries to exclude #[structopt(short, long)] exclude: Vec<u8>, /// Concurrency, determining the number of partitions for queries #[structopt(short, long)] concurrency: u8, /// Iterations (number of times to run each query) #[structopt(short, long)] iterations: u8, } #[derive(Debug, PartialEq, Serialize, Default)] pub struct Results { system_time: u128, datafusion_version: String, config: HashMap<String, String>, command_line_args: Vec<String>, register_tables_time: u128, /// Vector of (query_number, query_times) query_times: Vec<(u8, Vec<u128>)>, } impl Results { fn new() -> Self { let current_time = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards"); Self { system_time: current_time.as_millis(), datafusion_version: DATAFUSION_VERSION.to_string(), config: HashMap::new(), command_line_args: vec![], register_tables_time: 0, query_times: vec![], } } } #[tokio::main] pub async fn main() -> Result<()> { let mut results = Results::new(); for arg in std::env::args() { results.command_line_args.push(arg); } let opt = Opt::from_args(); let query_path = format!("{}", opt.query_path.display()); let output_path = format!("{}", opt.output.display()); let mut config = SessionConfig::new().with_target_partitions(opt.concurrency as usize); if let Some(config_path) = &opt.config_path { let file = File::open(config_path)?; let reader = BufReader::new(file); let lines = reader.lines(); for line in lines { let line = line?; if line.starts_with("#") { continue; } let parts = line.split('='); let parts = parts.collect::<Vec<&str>>(); if parts.len() == 2 { config = config.set(parts[0], ScalarValue::Utf8(Some(parts[1].to_string()))); } else { println!("Warning! Skipping config entry {}", line); } } } for entry in config.options().entries() { if let Some(ref value) = entry.value { results.config.insert(entry.key, value.to_string()); } } // register all tables in data directory let start = Instant::now(); let ctx = SessionContext::new_with_config(config); for file in fs::read_dir(&opt.data_path)? { let file = file?; let file_path = file.path(); let path = format!("{}", file.path().display()); if path.ends_with(".parquet") { let filename = Path::file_name(&file_path).unwrap().to_str().unwrap(); let table_name = &filename[0..filename.len() - 8]; println!("Registering table {} as {}", table_name, path); ctx.register_parquet(&table_name, &path, ParquetReadOptions::default()) .await?; } } let setup_time = start.elapsed().as_millis(); println!("Setup time was {} ms", setup_time); results.register_tables_time = setup_time; match opt.query { Some(query) => { execute_query( &ctx, &query_path, query, opt.debug, &output_path, opt.iterations, &mut results, ) .await?; } _ => { let num_queries = opt.num_queries.unwrap(); for query in 1..=num_queries { if opt.exclude.contains(&query) { println!("Skipping query {}", query); continue; } let result = execute_query( &ctx, &query_path, query, opt.debug, &output_path, opt.iterations, &mut results, ) .await; match result { Ok(_) => {} Err(e) => println!("Fail: {}", e), } } } } // write results json file let json = serde_json::to_string_pretty(&results).unwrap(); let f = File::create(&format!( "{}/results-{}.yaml", output_path, results.system_time ))?; let mut w = BufWriter::new(f); w.write(json.as_bytes())?; // write simple csv summary file let mut w = File::create(&format!("{}/results.csv", output_path))?; w.write(format!("setup,{}\n", results.register_tables_time).as_bytes())?; for (query, times) in &results.query_times { w.write(format!("q{},{}\n", query, times[0]).as_bytes())?; } Ok(()) } pub async fn execute_query( ctx: &SessionContext, query_path: &str, query_no: u8, debug: bool, output_path: &str, iterations: u8, results: &mut Results, ) -> Result<()> { let filename = format!("{}/q{query_no}.sql", query_path); println!("Executing query {} from {}", query_no, filename); let sql = fs::read_to_string(&filename)?; // some queries have multiple statements let sql = sql .split(';') .filter(|s| !s.trim().is_empty()) .collect::<Vec<_>>(); let multipart = sql.len() > 1; let mut durations = vec![]; for iteration in 0..iterations { // duration for executing all queries in the file let mut total_duration_millis = 0; for (i, sql) in sql.iter().enumerate() { if debug { println!("Query {}: {}", query_no, sql); } let file_suffix = if multipart { format!("_part_{}", i + 1) } else { "".to_owned() }; let start = Instant::now(); let df = ctx.sql(sql).await?; let batches = df.clone().collect().await?; let duration = start.elapsed(); total_duration_millis += duration.as_millis(); let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); println!( "Query {query_no}{file_suffix} executed in: {duration:?} and returned {row_count} rows", ); if iteration == 0 { let plan = df.clone().into_optimized_plan()?; let formatted_query_plan = format!("{}", plan.display_indent()); if debug { println!("{}", formatted_query_plan); } let exec = df.create_physical_plan().await?; if debug { println!("{}", displayable(exec.as_ref()).indent(false)); } let filename = format!( "{}/q{}{}_logical_plan.txt", output_path, query_no, file_suffix ); let mut file = File::create(&filename)?; write!(file, "{}", formatted_query_plan)?; // write results to disk if batches.is_empty() { println!("Empty result set returned"); } else { let filename = format!("{}/q{}{}.csv", output_path, query_no, file_suffix); let t = MemTable::try_new(batches[0].schema(), vec![batches])?; let df = ctx.read_table(Arc::new(t))?; df.write_csv(&filename, DataFrameWriteOptions::default(), None) .await?; } } } durations.push(total_duration_millis); } results.query_times.push((query_no, durations)); Ok(()) }