in src/flowgger/output/tls_output.rs [221:360]
fn config_parse(config: &Config) -> (TlsConfig, u32) {
let threads = config
.lookup("output.tls_threads")
.map_or(TLS_DEFAULT_THREADS, |x| {
x.as_integer()
.expect("output.tls_threads must be a 32-bit integer") as u32
});
let connect = config
.lookup("output.connect")
.expect("output.connect is required")
.as_array()
.expect("output.connect must be a list");
let mut connect: Vec<String> = connect
.iter()
.map(|x| {
x.as_str()
.expect("output.connect must be a list of strings")
.to_owned()
})
.collect();
let cert: Option<PathBuf> = config.lookup("output.tls_cert").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_cert must be a path to a .pem file"),
))
});
let key: Option<PathBuf> = config.lookup("output.tls_key").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_key must be a path to a .pem file"),
))
});
let ciphers = config
.lookup("output.tls_ciphers")
.map_or(DEFAULT_CIPHERS, |x| {
x.as_str()
.expect("output.tls_ciphers must be a string with a cipher suite")
})
.to_owned();
let verify_peer = config
.lookup("output.tls_verify_peer")
.map_or(DEFAULT_VERIFY_PEER, |x| {
x.as_bool()
.expect("output.tls_verify_peer must be a boolean")
});
let ca_file: Option<PathBuf> = config.lookup("output.tls_ca_file").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_ca_file must be a path to a file"),
))
});
let compression = config
.lookup("output.tls_compression")
.map_or(DEFAULT_COMPRESSION, |x| {
x.as_bool()
.expect("output.tls_compression must be a boolean")
});
let timeout = config
.lookup("output.timeout")
.map_or(DEFAULT_TIMEOUT, |x| {
x.as_integer().expect("output.timeout must be an integer") as u64
});
let async_ = config
.lookup("output.tls_async")
.map_or(DEFAULT_ASYNC, |x| {
x.as_bool().expect("output.tls_async must be a boolean")
});
let recovery_delay_init =
config
.lookup("output.tls_recovery_delay_init")
.map_or(DEFAULT_RECOVERY_DELAY_INIT, |x| {
x.as_integer()
.expect("output.tls_recovery_delay_init must be an integer")
as u32
});
let recovery_delay_max =
config
.lookup("output.tls_recovery_delay_max")
.map_or(DEFAULT_RECOVERY_DELAY_MAX, |x| {
x.as_integer()
.expect("output.tls_recovery_delay_max must be an integer")
as u32
});
let recovery_probe_time =
config
.lookup("output.tls_recovery_probe_time")
.map_or(DEFAULT_RECOVERY_PROBE_TIME, |x| {
x.as_integer()
.expect("output.tls_recovery_probe_time must be an integer")
as u32
});
if recovery_delay_max < recovery_delay_init {
panic!("output.tls_recovery_delay_max cannot be less than output.tls_recovery_delay_init");
}
let mut connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
{
let mut ctx = &mut connector_builder;
if !verify_peer {
ctx.set_verify(SslVerifyMode::NONE);
} else {
ctx.set_verify_depth(TLS_VERIFY_DEPTH);
ctx.set_verify(SslVerifyMode::PEER | SslVerifyMode::FAIL_IF_NO_PEER_CERT);
if let Some(ca_file) = ca_file {
ctx.set_ca_file(&ca_file)
.expect("Unable to read the trusted CA file");
}
}
let mut opts = SslOptions::CIPHER_SERVER_PREFERENCE
| SslOptions::NO_SESSION_RESUMPTION_ON_RENEGOTIATION;
if !compression {
opts |= SslOptions::NO_COMPRESSION;
}
ctx.set_options(opts);
set_fs(&mut ctx);
if let Some(cert) = cert {
ctx.set_certificate_file(&Path::new(&cert), SslFiletype::PEM)
.expect("Unable to read the TLS certificate");
}
if let Some(key) = key {
ctx.set_private_key_file(&Path::new(&key), SslFiletype::PEM)
.expect("Unable to read the TLS key");
}
ctx.set_cipher_list(&ciphers)
.expect("Unsupported cipher suite");
}
let connector = connector_builder.build();
rand::thread_rng().shuffle(&mut connect);
let cluster = Cluster { connect, idx: 0 };
let mx_cluster = Arc::new(Mutex::new(cluster));
let tls_config = TlsConfig {
mx_cluster,
timeout: Some(Duration::from_secs(timeout)),
connector,
async_,
recovery_delay_init,
recovery_delay_max,
recovery_probe_time,
};
(tls_config, threads)
}