in src/flowgger/output/tls_output.rs [124:174]
fn run(self) {
let tls_config = &self.tls_config;
let mut rng = rand::thread_rng();
let mut recovery_delay = f64::from(tls_config.recovery_delay_init);
let mut last_recovery;
loop {
last_recovery = chrono::offset::Utc::now();
let connect_chosen = {
let mut cluster = tls_config.mx_cluster.lock().unwrap();
cluster.idx += 1;
if cluster.idx >= cluster.connect.len() {
rng.shuffle(&mut cluster.connect);
cluster.idx = 0;
}
cluster.connect[cluster.idx].clone()
};
if let Err(e) = self.handle_connection(&connect_chosen) {
match e.kind() {
ErrorKind::ConnectionRefused => {
let _ = writeln!(stderr(), "Connection to {} refused", connect_chosen);
}
ErrorKind::ConnectionAborted | ErrorKind::ConnectionReset => {
let _ = writeln!(
stderr(),
"Connection to {} aborted by the server",
connect_chosen
);
}
_ => {
let _ = writeln!(
stderr(),
"Error while communicating with {} - {}",
connect_chosen,
e
);
}
}
}
let now = chrono::offset::Utc::now();
if now.signed_duration_since(last_recovery)
> chrono::Duration::milliseconds(i64::from(tls_config.recovery_probe_time))
{
recovery_delay = f64::from(tls_config.recovery_delay_init);
} else if recovery_delay < f64::from(tls_config.recovery_delay_max) {
let mut rng = rand::thread_rng();
recovery_delay += rng.gen_range(0.0, recovery_delay);
}
thread::sleep(Duration::from_millis(recovery_delay.round() as u64));
let _ = writeln!(stderr(), "Attempting to reconnect");
}
}