in rd-hashd/src/main.rs [159:379]
fn main() {
assert_eq!(*VERSION, *rd_hashd_intf::VERSION);
Args::set_help_body(std::str::from_utf8(include_bytes!("../README.md")).unwrap());
//
// Parse arguments and set up application logging (not the hash logging).
//
let mut args_file = Args::init_args_and_logging().expect("failed to process args file");
let args = &mut args_file.data;
debug!("arguments: {:#?}", args);
let tf_path = match args.testfiles.as_ref() {
Some(p) => p,
None => {
error!("--testfiles must be specified");
panic!();
}
};
debug_assert!({
warn!("Built with debug profile, may be too slow for nominal behaviors");
true
});
//
// Load params and init stat.
//
let mut params_file = JsonConfigFile::<Params>::load_or_create(args.params.as_ref())
.expect("failed to process params file");
let params = &mut params_file.data;
if params.file_frac > args.file_max_frac {
warn!("--file-max is lower than Params::file_frac, adjusting file_frac");
params.file_frac = args.file_max_frac;
}
//
// Create the testfiles root dir and determine whether we're on rotational
// devices.
//
let mut tf = TestFiles::new(
tf_path,
TESTFILE_UNIT_SIZE,
args.file_max_size(),
args.compressibility,
);
tf.prep_base_dir().unwrap();
ROTATIONAL_TESTFILES.store(storage_info::is_path_rotational(tf_path), Ordering::Relaxed);
let rot_tf = ROTATIONAL_TESTFILES.load(Ordering::Relaxed);
let rot_swap = *ROTATIONAL_SWAP;
if rot_tf || rot_swap {
let mut msg = format!(
"Hard disk detected (testfiles={}, swap={})",
rot_tf, rot_swap
);
if let Some(false) = args.rotational {
msg += " but rotational mode is inhibited";
} else {
msg += ", enabling rotational mode";
ROTATIONAL.store(true, Ordering::Relaxed);
}
info!("{}", &msg);
} else if let Some(true) = args.rotational {
info!("No hard disk detected but forcing rotational mode");
ROTATIONAL.store(true, Ordering::Relaxed);
}
//
// Init stat file and prepare testfiles.
//
let mut report_file = JsonReportFile::<Report>::new(args.report.as_ref());
report_file.data.params_modified = DateTime::from(params_file.loaded_mod);
report_tick(&mut report_file, false);
if args.clear_testfiles {
info!("Clearing {}", tf_path);
tf.clear().unwrap();
}
if args.prepare_testfiles {
let greet = format!(
"Populating {} with {} {}M files ({:.2}G)",
tf_path,
tf.nr_files,
to_mb(TESTFILE_UNIT_SIZE),
to_gb(args.file_max_size())
);
// Lay out the testfiles while reporting progress.
let mut tfbar = TestFilesProgressBar::new(
args.file_max_size(),
&greet,
"Preparing testfiles",
args.verbosity > 1,
);
tf.setup(|pos| {
tfbar.progress(pos);
report_file.data.testfiles_progress = pos as f64 / args.file_max_size() as f64;
report_tick(&mut report_file, true);
})
.unwrap();
report_file.data.testfiles_progress = 1.0;
report_tick(&mut report_file, false);
if !args.keep_cache {
info!("Dropping page cache for testfiles");
tf.drop_cache();
}
}
if args.prepare_and_exit {
return;
}
//
// Benchmark and exit if requested.
//
if args.bench_cpu || args.bench_mem {
let mut bench = bench::Bench::new(args_file, params_file, report_file);
bench.run();
exit(0);
}
//
// Start the hasher.
//
let size = args.size as f64 * params.mem_frac;
let fsize = (size * params.file_frac).min(size);
let asize = size - fsize;
info!(
"Starting hasher (maxcon={} lat={:.1}ms rps={} file={:.2}G anon={:.2}G)",
params.concurrency_max,
params.lat_target * TO_MSEC,
params.rps_target,
to_gb(fsize),
to_gb(asize)
);
let mut dispatch = hasher::Dispatch::new(
args.size,
tf,
¶ms,
args.compressibility,
create_logger(args, ¶ms),
);
//
// Monitor and report.
//
report_file.data.phase = Phase::Running;
sleep(Duration::from_secs(1));
let mut stat_sum: Stat = Default::default();
let mut nr_sums: u32 = 0;
let mut last_summary_at = Instant::now();
loop {
sleep(Duration::from_secs(1));
let now = Instant::now();
let stat = &mut report_file.data.hasher;
*stat = dispatch.get_stat();
stat_sum += &stat;
nr_sums += 1;
if args.interval != 0
&& now.duration_since(last_summary_at).as_secs_f64() >= args.interval as f64
{
stat_sum.avg(nr_sums);
let mut buf = format!(
"p50:{:5.1} p84:{:5.1} p90:{:5.1} p99:{:5.1} rps:{:6.1} con:{:5.1}",
stat_sum.lat.p50 * TO_MSEC,
stat_sum.lat.p84 * TO_MSEC,
stat_sum.lat.p90 * TO_MSEC,
stat_sum.lat.p99 * TO_MSEC,
stat_sum.rps,
stat.concurrency
);
if args.verbosity > 0 {
write!(
buf,
"/{:.1} infl:{} workers:{}/{} done:{}",
stat.concurrency_max,
stat.nr_in_flight,
stat.nr_workers - stat.nr_idle_workers,
stat.nr_workers,
stat.nr_done,
)
.unwrap();
}
info!("{}", buf);
stat_sum = Default::default();
nr_sums = 0;
last_summary_at = now;
}
match params_file.maybe_reload() {
Ok(true) => {
dispatch.set_params(¶ms_file.data);
report_file.data.params_modified = DateTime::from(params_file.loaded_mod);
info!(
"Reloaded params file {:?}",
¶ms_file.path.as_ref().unwrap()
);
}
Ok(false) => {}
Err(e) => warn!(
"Failed to reload params file {:?} ({:?})",
¶ms_file.path, &e
),
}
report_tick(&mut report_file, false);
}
}