in src/flowgger/input/redis_input.rs [96:122]
fn run(self) -> Result<(), String> {
let queue_key: &str = &self.config.queue_key;
let queue_key_tmp: &str = &format!("{}.tmp.{}", queue_key, self.tid);
let redis_cnx = self.redis_cnx;
println!(
"Connected to Redis [{}], pulling messages from key [{}]",
self.config.connect, queue_key
);
while {
let dummy: RedisResult<String> = redis_cnx.rpoplpush(queue_key_tmp, queue_key);
dummy.is_ok()
} {}
let (decoder, encoder): (Box<dyn Decoder>, Box<dyn Encoder>) = (self.decoder, self.encoder);
loop {
let line: String = match redis_cnx.brpoplpush(queue_key, queue_key_tmp, 0) {
Err(e) => return Err(format!("Redis protocol error in BRPOPLPUSH: [{}]", e)),
Ok(line) => line,
};
if let Err(e) = handle_record(&line, &self.tx, &decoder, &encoder) {
let _ = writeln!(stderr(), "{}: [{}]", e, line.trim());
}
let res: RedisResult<u8> = redis_cnx.lrem(queue_key_tmp as &str, 1, line as String);
if let Err(e) = res {
return Err(format!("Redis protocol error in LREM: [{}]", e));
};
}
}