fn record()

in below/src/main.rs [833:1009]


fn record(
    init: init::InitToken,
    logger: slog::Logger,
    errs: Receiver<Error>,
    interval: Duration,
    below_config: &BelowConfig,
    retention: Option<Duration>,
    store_size_limit: Option<u64>,
    collect_io_stat: bool,
    skew_detection_threshold: Duration,
    debug: bool,
    disable_disk_stat: bool,
    disable_exitstats: bool,
    enable_gpu_stats: bool,
    compress_opts: &CompressOpts,
) -> Result<()> {
    debug!(logger, "Starting up!");

    if !disable_exitstats {
        bump_memlock_rlimit()?;
    }

    let mut store = store::StoreWriter::new(
        logger.clone(),
        &below_config.store_dir,
        compress_opts.to_compression_mode()?,
        store::Format::Cbor,
    )?;
    let mut stats = statistics::Statistics::new();

    let (exit_buffer, bpf_errs) = if disable_exitstats {
        (Arc::new(Mutex::new(procfs::PidMap::new())), None)
    } else {
        start_exitstat(logger.clone(), debug)
    };
    let mut bpf_err_warned = false;

    // Handle cgroup filter from conf and generate Regex
    let cgroup_re = if !below_config.cgroup_filter_out.is_empty() {
        Some(
            Regex::new(&below_config.cgroup_filter_out)
                .expect("Failed to generate regex from cgroup_filter_out in below.conf"),
        )
    } else {
        None
    };

    #[cfg(fbcode_build)]
    let gpu_stats_receiver = if enable_gpu_stats {
        let thread_interval = interval.clone();
        let gpu_collector = model::gpu_stats_collector_plugin::GpuStatsCollectorPlugin::new(
            init.fb,
            logger.clone(),
        )
        .context("Failed to initialize GPU stats collector")?;
        let (mut collector, receiver) = model::collector_plugin::collector_consumer(gpu_collector);
        let logger_clone = logger.clone();
        thread::Builder::new()
            .name("gpu_collector".to_owned())
            .spawn(move || {
                loop {
                    let collect_instant = Instant::now();
                    match futures::executor::block_on(collector.collect_and_update()) {
                        Ok(_) => {}
                        Err(e) => error!(logger_clone, "{:?}", e),
                    }
                    let collect_duration = Instant::now().duration_since(collect_instant);

                    const COLLECT_DURATION_WARN_THRESHOLD: u64 = 2;
                    if collect_duration > Duration::from_secs(COLLECT_DURATION_WARN_THRESHOLD) {
                        warn!(
                            logger_clone,
                            "GPU collection took {} > {}",
                            collect_duration.as_secs_f64(),
                            COLLECT_DURATION_WARN_THRESHOLD
                        );
                    }
                    if thread_interval > collect_duration {
                        let sleep_duration = thread_interval - collect_duration;
                        std::thread::sleep(sleep_duration);
                    }
                }
            })
            .expect("Failed to spawn thread");
        Some(receiver)
    } else {
        None
    };

    let collector = model::Collector::new(
        logger.clone(),
        model::CollectorOptions {
            cgroup_root: below_config.cgroup_root.clone(),
            exit_data: exit_buffer,
            collect_io_stat,
            disable_disk_stat,
            cgroup_re,
            #[cfg(fbcode_build)]
            gpu_stats_receiver,
        },
    );

    loop {
        if !disable_exitstats {
            // Anything that comes over the error channel is an error
            match errs.try_recv() {
                Ok(e) => bail!(e),
                Err(TryRecvError::Empty) => {}
                Err(TryRecvError::Disconnected) => bail!("error channel disconnected"),
            };

            if !bpf_err_warned {
                bpf_err_warned = check_for_exitstat_errors(
                    &logger,
                    bpf_errs
                        .as_ref()
                        .expect("Failed to unwrap bpf_errs receiver"),
                );
            }
        }

        let collect_instant = Instant::now();

        let collected_sample = collector.collect_sample();
        let post_collect_sys_time = SystemTime::now();
        let post_collect_instant = Instant::now();

        let collection_skew = post_collect_instant.duration_since(collect_instant);
        if collection_skew >= skew_detection_threshold {
            warn!(
                logger,
                "data collection took {} ms (>= {} ms)",
                collection_skew.as_millis(),
                skew_detection_threshold.as_millis()
            );

            stats.report_collection_skew();
        }

        match collected_sample {
            Ok(s) => {
                match store.put(post_collect_sys_time, &DataFrame { sample: s }) {
                    Ok(/* new shard */ true) => {
                        cleanup_store(&store, &logger, store_size_limit, /* retention */ None)?
                    }
                    Ok(/* new shard */ false) => {}
                    Err(e) => error!(logger, "{:#}", e),
                }
            }
            Err(e) => {
                // Handle cgroupfs errors
                match e.downcast_ref::<cgroupfs::Error>() {
                    // Unrecoverable error -- below only supports cgroup2
                    Some(cgroupfs::Error::NotCgroup2(_)) => bail!(e),
                    _ => {}
                };

                error!(logger, "{:#}", e);
            }
        };

        // Only check against retention and not size limit. Size limit is only
        // checked on creation of successful write to a new shard.
        cleanup_store(&store, &logger, /* store_size_limit */ None, retention)?;

        stats.report_store_size(below_config.store_dir.as_path());

        let collect_duration = Instant::now().duration_since(collect_instant);
        // Sleep for at least 1s to avoid sample collision
        let sleep_duration = if interval > collect_duration {
            std::cmp::max(Duration::from_secs(1), interval - collect_duration)
        } else {
            Duration::from_secs(1)
        };
        std::thread::sleep(sleep_duration);
    }
}