in src/flowgger/decoder/ltsv_decoder.rs [89:219]
fn decode(&self, line: &str) -> Result<Record, &'static str> {
let mut sd = StructuredData::new(None);
let mut ts = None;
let mut hostname = None;
let mut msg = None;
let mut severity = None;
for part in line.split('\t') {
let mut pair = part.splitn(2, ':');
let k = pair.next();
let v = pair.next();
match (k, v) {
(Some(name), None) => println!("Missing value for name '{}'", name),
(None, None) => println!("Missing name and value for a LTSV record"),
(None, Some(value)) => println!("Missing name for value '{}'", value),
(Some(name), Some(value)) => {
match name {
"time" => {
let ts_s = if value.starts_with('[') && value.ends_with(']') {
&value[1..(value.len() - 1)]
} else {
value
};
ts = Some(parse_ts(ts_s)?);
}
"host" => hostname = Some(value.to_owned()),
"message" => msg = Some(value.to_owned()),
"level" => {
let severity_given: u8 =
value.parse().or(Err("Invalid severity level"))?;
if severity_given > 7 {
return Err("Severity level should be <= 7");
}
severity = Some(severity_given);
}
name => {
let (final_name, value): (String, SDValue) = if let Some(ref schema) =
self.schema
{
match schema.get(name) {
None | Some(&SDValueType::String) => {
(format!("_{}", name), SDValue::String(value.to_owned()))
}
Some(&SDValueType::Bool) => {
let final_name = match self.suffixes.s_bool {
Some(ref suffix) if !name.ends_with(suffix) => {
format!("_{}{}", name, suffix)
}
_ => format!("_{}", name),
};
(
final_name,
SDValue::Bool(
value
.parse::<bool>()
.or(Err("Type error; boolean was expected"))?,
),
)
}
Some(&SDValueType::F64) => {
let final_name = match self.suffixes.s_f64 {
Some(ref suffix) if !name.ends_with(suffix) => {
format!("_{}{}", name, suffix)
}
_ => format!("_{}", name),
};
(
final_name,
SDValue::F64(
value
.parse::<f64>()
.or(Err("Type error; f64 was expected"))?,
),
)
}
Some(&SDValueType::I64) => {
let final_name = match self.suffixes.s_i64 {
Some(ref suffix) if !name.ends_with(suffix) => {
format!("_{}{}", name, suffix)
}
_ => format!("_{}", name),
};
(
final_name,
SDValue::I64(
value
.parse::<i64>()
.or(Err("Type error; i64 was expected"))?,
),
)
}
Some(&SDValueType::U64) => {
let final_name = match self.suffixes.s_u64 {
Some(ref suffix) if !name.ends_with(suffix) => {
format!("_{}{}", name, suffix)
}
_ => format!("_{}", name),
};
(
final_name,
SDValue::U64(
value
.parse::<u64>()
.or(Err("Type error; u64 was expected"))?,
),
)
}
}
} else {
(format!("_{}", name), SDValue::String(value.to_owned()))
};
sd.pairs.push((final_name, value));
}
};
}
};
}
let record = Record {
ts: ts.ok_or("Missing timestamp")?,
hostname: hostname.ok_or("Missing hostname")?,
facility: None,
severity,
appname: None,
procid: None,
msgid: None,
sd: if sd.pairs.is_empty() { None } else { Some(vec![sd]) },
msg,
full_msg: Some(line.to_owned()),
};
Ok(record)
}