in below/src/main.rs [1113:1176]
fn live_remote(
logger: slog::Logger,
errs: Receiver<Error>,
interval: Duration,
host: String,
port: Option<u16>,
) -> Result<()> {
let timestamp = SystemTime::now()
.checked_sub(Duration::from_secs(LIVE_REMOTE_MAX_LATENCY_SEC))
.expect("Fail to construct timestamp with latency allowance in live remote.");
let mut advance = new_advance_remote(logger.clone(), host, port, timestamp)?;
advance.initialize();
let mut view = match advance.get_latest_sample() {
Some(model) => view::View::new_with_advance(
model,
view::ViewMode::Live(Rc::new(RefCell::new(advance))),
),
None => return Err(anyhow!("No data could be found!")),
};
let sink = view.cb_sink().clone();
thread::Builder::new()
.name("live_remote_collector".to_owned())
.spawn(move || {
loop {
// Rely on timeout to guarantee interval between samples
match errs.recv_timeout(interval) {
Ok(e) => {
error!(logger, "{:#}", e);
sink.send(Box::new(|c| c.quit()))
.expect("Failed to stop view");
return;
}
Err(RecvTimeoutError::Disconnected) => {
error!(logger, "error channel disconnected");
sink.send(Box::new(|c| c.quit()))
.expect("Failed to stop view");
return;
}
Err(RecvTimeoutError::Timeout) => {}
};
let data_plane = Box::new(move |s: &mut Cursive| {
let view_state = s.user_data::<ViewState>().expect("user data not set");
if let view::ViewMode::Live(adv) = view_state.mode.clone() {
match adv.borrow_mut().advance(store::Direction::Forward) {
Some(data) => view_state.update(data),
None => {}
}
}
});
if sink.send(data_plane).is_err() {
return;
}
}
})
.expect("Failed to spawn thread");
logutil::set_current_log_target(logutil::TargetLog::File);
view.run()
}