fn run()

in src/flowgger/input/redis_input.rs [96:122]


    fn run(self) -> Result<(), String> {
        let queue_key: &str = &self.config.queue_key;
        let queue_key_tmp: &str = &format!("{}.tmp.{}", queue_key, self.tid);
        let redis_cnx = self.redis_cnx;
        println!(
            "Connected to Redis [{}], pulling messages from key [{}]",
            self.config.connect, queue_key
        );
        while {
            let dummy: RedisResult<String> = redis_cnx.rpoplpush(queue_key_tmp, queue_key);
            dummy.is_ok()
        } {}
        let (decoder, encoder): (Box<dyn Decoder>, Box<dyn Encoder>) = (self.decoder, self.encoder);
        loop {
            let line: String = match redis_cnx.brpoplpush(queue_key, queue_key_tmp, 0) {
                Err(e) => return Err(format!("Redis protocol error in BRPOPLPUSH: [{}]", e)),
                Ok(line) => line,
            };
            if let Err(e) = handle_record(&line, &self.tx, &decoder, &encoder) {
                let _ = writeln!(stderr(), "{}: [{}]", e, line.trim());
            }
            let res: RedisResult<u8> = redis_cnx.lrem(queue_key_tmp as &str, 1, line as String);
            if let Err(e) = res {
                return Err(format!("Redis protocol error in LREM: [{}]", e));
            };
        }
    }