in src/flowgger/input/redis_input.rs [126:150]
fn accept(
&self,
tx: SyncSender<Vec<u8>>,
decoder: Box<dyn Decoder + Send>,
encoder: Box<dyn Encoder + Send>,
) {
let mut jids = Vec::new();
for tid in 0..self.threads {
let config = self.config.clone();
let (encoder, decoder) = (encoder.clone_boxed(), decoder.clone_boxed());
let tx = tx.clone();
jids.push(thread::spawn(move || {
let worker = RedisWorker::new(tid, config, tx, decoder, encoder);
if let Err(e) = worker.run() {
let _ = writeln!(stderr(), "Redis connection lost, aborting - {}", e);
}
exit(1);
}));
}
for jid in jids {
if jid.join().is_err() {
panic!("Redis connection lost");
}
}
}