fn run_inner()

in rd-agent/src/report.rs [853:949]


    fn run_inner(mut self) {
        let mut next_at = unix_now() + 1;

        let runner = self.runner.data.lock().unwrap();
        let cfg = &runner.cfg;

        let mut iolat = IoLatReader::new(cfg, "iolat", "1").unwrap();
        let mut iolat_cum = IoLatReader::new(cfg, "iolat_cum", "-1").unwrap();

        drop(runner);
        let mut sleep_dur = Duration::from_secs(0);
        let mut iolat_retries = crate::misc::BCC_RETRIES;
        let mut iolat_cum_retries = crate::misc::BCC_RETRIES;
        let mut iolat_cum_kicked_at = UNIX_EPOCH;

        'outer: loop {
            select! {
                recv(iolat.rx.as_ref().unwrap()) -> res => {
                    // the cumulative instance doesn't have an interval,
                    // kick it and run it at the same pace as the 1s one. If
                    // we stalled for a while, we may busy loop here kicking
                    // iolat_cum repeatedly causing the python signal
                    // handler to hit maximum recursion limit and fail.
                    // Don't kick in quick succession.
                    let now = SystemTime::now();
                    match now.duration_since(iolat_cum_kicked_at) {
                        Ok(dur) => {
                            if dur.as_secs_f64() > 0.1 {
                                iolat_cum.kick();
                                iolat_cum_kicked_at = now;
                            }
                        }
                        Err(_) => iolat_cum_kicked_at = now,
                    }

                    match res {
                        Ok(line) => {
                            match Self::parse_iolat_output(&line) {
                                Ok(v) => self.iolat = v,
                                Err(e) => warn!("report: failed to parse iolat output ({:?})", &e),
                            }
                        }
                        Err(e) => Self::maybe_retry_iolat(&mut iolat_retries, &mut iolat, &e),
                    }
                },
                recv(iolat_cum.rx.as_ref().unwrap()) -> res => {
                    match res {
                        Ok(line) => {
                            match Self::parse_iolat_output(&line) {
                                Ok(v) => self.iolat_cum = v,
                                Err(e) => warn!("report: failed to parse iolat_cum output ({:?})", &e),
                            }
                        }
                        Err(e) => Self::maybe_retry_iolat(&mut iolat_cum_retries, &mut iolat_cum, &e),
                    }
                },
                recv(self.term_rx) -> term => {
                    if let Err(e) = term {
                        info!("report: Term ({})", &e);
                        break 'outer;
                    }
                },
                recv(channel::after(sleep_dur)) -> _ => (),
            }

            let sleep_till = UNIX_EPOCH + Duration::from_secs(next_at) + Duration::from_millis(500);
            match sleep_till.duration_since(SystemTime::now()) {
                Ok(v) => {
                    sleep_dur = v;
                    trace!("report: Sleeping {}ms", sleep_dur.as_millis());
                    continue 'outer;
                }
                _ => {}
            }

            // base_report() generation may take some time. Timestamp here.
            let now = unix_now();

            // generate base
            let base_report = match self.base_report() {
                Ok(v) => v,
                Err(e) => {
                    error!("report: Failed to generate base report ({:?})", &e);
                    continue;
                }
            };

            self.report_file.tick(&base_report, now);
            self.report_file_1min.tick(&base_report, now);

            // Report generation and writing could have taken a while. If we
            // overshot the 500ms window and are in the next second window,
            // we wanna wait for the next window. Re-acquire the current
            // second to determine when the next report will be generated.
            next_at = unix_now() + 1;
        }
    }