fn main()

in rd-hashd/src/main.rs [159:379]


fn main() {
    assert_eq!(*VERSION, *rd_hashd_intf::VERSION);
    Args::set_help_body(std::str::from_utf8(include_bytes!("../README.md")).unwrap());

    //
    // Parse arguments and set up application logging (not the hash logging).
    //
    let mut args_file = Args::init_args_and_logging().expect("failed to process args file");
    let args = &mut args_file.data;

    debug!("arguments: {:#?}", args);

    let tf_path = match args.testfiles.as_ref() {
        Some(p) => p,
        None => {
            error!("--testfiles must be specified");
            panic!();
        }
    };

    debug_assert!({
        warn!("Built with debug profile, may be too slow for nominal behaviors");
        true
    });

    //
    // Load params and init stat.
    //
    let mut params_file = JsonConfigFile::<Params>::load_or_create(args.params.as_ref())
        .expect("failed to process params file");
    let params = &mut params_file.data;

    if params.file_frac > args.file_max_frac {
        warn!("--file-max is lower than Params::file_frac, adjusting file_frac");
        params.file_frac = args.file_max_frac;
    }

    //
    // Create the testfiles root dir and determine whether we're on rotational
    // devices.
    //
    let mut tf = TestFiles::new(
        tf_path,
        TESTFILE_UNIT_SIZE,
        args.file_max_size(),
        args.compressibility,
    );
    tf.prep_base_dir().unwrap();

    ROTATIONAL_TESTFILES.store(storage_info::is_path_rotational(tf_path), Ordering::Relaxed);

    let rot_tf = ROTATIONAL_TESTFILES.load(Ordering::Relaxed);
    let rot_swap = *ROTATIONAL_SWAP;

    if rot_tf || rot_swap {
        let mut msg = format!(
            "Hard disk detected (testfiles={}, swap={})",
            rot_tf, rot_swap
        );
        if let Some(false) = args.rotational {
            msg += " but rotational mode is inhibited";
        } else {
            msg += ", enabling rotational mode";
            ROTATIONAL.store(true, Ordering::Relaxed);
        }
        info!("{}", &msg);
    } else if let Some(true) = args.rotational {
        info!("No hard disk detected but forcing rotational mode");
        ROTATIONAL.store(true, Ordering::Relaxed);
    }

    //
    // Init stat file and prepare testfiles.
    //
    let mut report_file = JsonReportFile::<Report>::new(args.report.as_ref());
    report_file.data.params_modified = DateTime::from(params_file.loaded_mod);
    report_tick(&mut report_file, false);

    if args.clear_testfiles {
        info!("Clearing {}", tf_path);
        tf.clear().unwrap();
    }

    if args.prepare_testfiles {
        let greet = format!(
            "Populating {} with {} {}M files ({:.2}G)",
            tf_path,
            tf.nr_files,
            to_mb(TESTFILE_UNIT_SIZE),
            to_gb(args.file_max_size())
        );

        // Lay out the testfiles while reporting progress.
        let mut tfbar = TestFilesProgressBar::new(
            args.file_max_size(),
            &greet,
            "Preparing testfiles",
            args.verbosity > 1,
        );
        tf.setup(|pos| {
            tfbar.progress(pos);
            report_file.data.testfiles_progress = pos as f64 / args.file_max_size() as f64;
            report_tick(&mut report_file, true);
        })
        .unwrap();
        report_file.data.testfiles_progress = 1.0;
        report_tick(&mut report_file, false);

        if !args.keep_cache {
            info!("Dropping page cache for testfiles");
            tf.drop_cache();
        }
    }

    if args.prepare_and_exit {
        return;
    }

    //
    // Benchmark and exit if requested.
    //
    if args.bench_cpu || args.bench_mem {
        let mut bench = bench::Bench::new(args_file, params_file, report_file);
        bench.run();
        exit(0);
    }

    //
    // Start the hasher.
    //
    let size = args.size as f64 * params.mem_frac;
    let fsize = (size * params.file_frac).min(size);
    let asize = size - fsize;
    info!(
        "Starting hasher (maxcon={} lat={:.1}ms rps={} file={:.2}G anon={:.2}G)",
        params.concurrency_max,
        params.lat_target * TO_MSEC,
        params.rps_target,
        to_gb(fsize),
        to_gb(asize)
    );

    let mut dispatch = hasher::Dispatch::new(
        args.size,
        tf,
        &params,
        args.compressibility,
        create_logger(args, &params),
    );

    //
    // Monitor and report.
    //
    report_file.data.phase = Phase::Running;

    sleep(Duration::from_secs(1));

    let mut stat_sum: Stat = Default::default();
    let mut nr_sums: u32 = 0;
    let mut last_summary_at = Instant::now();
    loop {
        sleep(Duration::from_secs(1));
        let now = Instant::now();
        let stat = &mut report_file.data.hasher;

        *stat = dispatch.get_stat();
        stat_sum += &stat;
        nr_sums += 1;

        if args.interval != 0
            && now.duration_since(last_summary_at).as_secs_f64() >= args.interval as f64
        {
            stat_sum.avg(nr_sums);

            let mut buf = format!(
                "p50:{:5.1} p84:{:5.1} p90:{:5.1} p99:{:5.1} rps:{:6.1} con:{:5.1}",
                stat_sum.lat.p50 * TO_MSEC,
                stat_sum.lat.p84 * TO_MSEC,
                stat_sum.lat.p90 * TO_MSEC,
                stat_sum.lat.p99 * TO_MSEC,
                stat_sum.rps,
                stat.concurrency
            );
            if args.verbosity > 0 {
                write!(
                    buf,
                    "/{:.1} infl:{} workers:{}/{} done:{}",
                    stat.concurrency_max,
                    stat.nr_in_flight,
                    stat.nr_workers - stat.nr_idle_workers,
                    stat.nr_workers,
                    stat.nr_done,
                )
                .unwrap();
            }
            info!("{}", buf);

            stat_sum = Default::default();
            nr_sums = 0;
            last_summary_at = now;
        }

        match params_file.maybe_reload() {
            Ok(true) => {
                dispatch.set_params(&params_file.data);
                report_file.data.params_modified = DateTime::from(params_file.loaded_mod);
                info!(
                    "Reloaded params file {:?}",
                    &params_file.path.as_ref().unwrap()
                );
            }
            Ok(false) => {}
            Err(e) => warn!(
                "Failed to reload params file {:?} ({:?})",
                &params_file.path, &e
            ),
        }

        report_tick(&mut report_file, false);
    }
}