fn run()

in src/flowgger/splitter/capnp_splitter.rs [16:62]


    fn run(
        &self,
        buf_reader: BufReader<T>,
        tx: SyncSender<Vec<u8>>,
        _decoder: Box<dyn Decoder>,
        encoder: Box<dyn Encoder>,
    ) {
        let mut buf_reader = buf_reader;
        loop {
            let message_reader =
                match capnp::serialize::read_message(&mut buf_reader, ReaderOptions::new()) {
                    Err(e) => match e.kind {
                        capnp::ErrorKind::Failed | capnp::ErrorKind::Unimplemented => {
                            let _ = writeln!(stderr(), "Capnp decoding error: {}", e.description);
                            return;
                        }
                        capnp::ErrorKind::Overloaded => {
                            thread::sleep(Duration::from_millis(250));
                            continue;
                        }
                        capnp::ErrorKind::Disconnected => {
                            let _ = writeln!(
                                stderr(),
                                "Client hasn't sent any data for a while - Closing \
                                 idle connection"
                            );
                            return;
                        }
                    },
                    Ok(message_reader) => message_reader,
                };
            let message: record_capnp::record::Reader = message_reader.get_root().unwrap();
            let record = match handle_message(message) {
                Err(e) => {
                    let _ = writeln!(stderr(), "{}", e);
                    continue;
                }
                Ok(record) => record,
            };
            match encoder.encode(record) {
                Err(e) => {
                    let _ = writeln!(stderr(), "{}", e);
                }
                Ok(reencoded) => tx.send(reencoded).unwrap(),
            };
        }
    }