in src/flowgger/splitter/capnp_splitter.rs [16:62]
fn run(
&self,
buf_reader: BufReader<T>,
tx: SyncSender<Vec<u8>>,
_decoder: Box<dyn Decoder>,
encoder: Box<dyn Encoder>,
) {
let mut buf_reader = buf_reader;
loop {
let message_reader =
match capnp::serialize::read_message(&mut buf_reader, ReaderOptions::new()) {
Err(e) => match e.kind {
capnp::ErrorKind::Failed | capnp::ErrorKind::Unimplemented => {
let _ = writeln!(stderr(), "Capnp decoding error: {}", e.description);
return;
}
capnp::ErrorKind::Overloaded => {
thread::sleep(Duration::from_millis(250));
continue;
}
capnp::ErrorKind::Disconnected => {
let _ = writeln!(
stderr(),
"Client hasn't sent any data for a while - Closing \
idle connection"
);
return;
}
},
Ok(message_reader) => message_reader,
};
let message: record_capnp::record::Reader = message_reader.get_root().unwrap();
let record = match handle_message(message) {
Err(e) => {
let _ = writeln!(stderr(), "{}", e);
continue;
}
Ok(record) => record,
};
match encoder.encode(record) {
Err(e) => {
let _ = writeln!(stderr(), "{}", e);
}
Ok(reencoded) => tx.send(reencoded).unwrap(),
};
}
}