fn config_parse()

in src/flowgger/output/tls_output.rs [221:360]


fn config_parse(config: &Config) -> (TlsConfig, u32) {
    let threads = config
        .lookup("output.tls_threads")
        .map_or(TLS_DEFAULT_THREADS, |x| {
            x.as_integer()
                .expect("output.tls_threads must be a 32-bit integer") as u32
        });
    let connect = config
        .lookup("output.connect")
        .expect("output.connect is required")
        .as_array()
        .expect("output.connect must be a list");
    let mut connect: Vec<String> = connect
        .iter()
        .map(|x| {
            x.as_str()
                .expect("output.connect must be a list of strings")
                .to_owned()
        })
        .collect();
    let cert: Option<PathBuf> = config.lookup("output.tls_cert").and_then(|x| {
        Some(PathBuf::from(
            x.as_str()
                .expect("output.tls_cert must be a path to a .pem file"),
        ))
    });
    let key: Option<PathBuf> = config.lookup("output.tls_key").and_then(|x| {
        Some(PathBuf::from(
            x.as_str()
                .expect("output.tls_key must be a path to a .pem file"),
        ))
    });
    let ciphers = config
        .lookup("output.tls_ciphers")
        .map_or(DEFAULT_CIPHERS, |x| {
            x.as_str()
                .expect("output.tls_ciphers must be a string with a cipher suite")
        })
        .to_owned();
    let verify_peer = config
        .lookup("output.tls_verify_peer")
        .map_or(DEFAULT_VERIFY_PEER, |x| {
            x.as_bool()
                .expect("output.tls_verify_peer must be a boolean")
        });
    let ca_file: Option<PathBuf> = config.lookup("output.tls_ca_file").and_then(|x| {
        Some(PathBuf::from(
            x.as_str()
                .expect("output.tls_ca_file must be a path to a file"),
        ))
    });
    let compression = config
        .lookup("output.tls_compression")
        .map_or(DEFAULT_COMPRESSION, |x| {
            x.as_bool()
                .expect("output.tls_compression must be a boolean")
        });
    let timeout = config
        .lookup("output.timeout")
        .map_or(DEFAULT_TIMEOUT, |x| {
            x.as_integer().expect("output.timeout must be an integer") as u64
        });
    let async_ = config
        .lookup("output.tls_async")
        .map_or(DEFAULT_ASYNC, |x| {
            x.as_bool().expect("output.tls_async must be a boolean")
        });
    let recovery_delay_init =
        config
            .lookup("output.tls_recovery_delay_init")
            .map_or(DEFAULT_RECOVERY_DELAY_INIT, |x| {
                x.as_integer()
                    .expect("output.tls_recovery_delay_init must be an integer")
                    as u32
            });
    let recovery_delay_max =
        config
            .lookup("output.tls_recovery_delay_max")
            .map_or(DEFAULT_RECOVERY_DELAY_MAX, |x| {
                x.as_integer()
                    .expect("output.tls_recovery_delay_max must be an integer")
                    as u32
            });
    let recovery_probe_time =
        config
            .lookup("output.tls_recovery_probe_time")
            .map_or(DEFAULT_RECOVERY_PROBE_TIME, |x| {
                x.as_integer()
                    .expect("output.tls_recovery_probe_time must be an integer")
                    as u32
            });
    if recovery_delay_max < recovery_delay_init {
        panic!("output.tls_recovery_delay_max cannot be less than output.tls_recovery_delay_init");
    }
    let mut connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
    {
        let mut ctx = &mut connector_builder;
        if !verify_peer {
            ctx.set_verify(SslVerifyMode::NONE);
        } else {
            ctx.set_verify_depth(TLS_VERIFY_DEPTH);
            ctx.set_verify(SslVerifyMode::PEER | SslVerifyMode::FAIL_IF_NO_PEER_CERT);
            if let Some(ca_file) = ca_file {
                ctx.set_ca_file(&ca_file)
                    .expect("Unable to read the trusted CA file");
            }
        }
        let mut opts = SslOptions::CIPHER_SERVER_PREFERENCE
            | SslOptions::NO_SESSION_RESUMPTION_ON_RENEGOTIATION;
        if !compression {
            opts |= SslOptions::NO_COMPRESSION;
        }
        ctx.set_options(opts);
        set_fs(&mut ctx);
        if let Some(cert) = cert {
            ctx.set_certificate_file(&Path::new(&cert), SslFiletype::PEM)
                .expect("Unable to read the TLS certificate");
        }
        if let Some(key) = key {
            ctx.set_private_key_file(&Path::new(&key), SslFiletype::PEM)
                .expect("Unable to read the TLS key");
        }
        ctx.set_cipher_list(&ciphers)
            .expect("Unsupported cipher suite");
    }
    let connector = connector_builder.build();
    rand::thread_rng().shuffle(&mut connect);
    let cluster = Cluster { connect, idx: 0 };
    let mx_cluster = Arc::new(Mutex::new(cluster));
    let tls_config = TlsConfig {
        mx_cluster,
        timeout: Some(Duration::from_secs(timeout)),
        connector,
        async_,
        recovery_delay_init,
        recovery_delay_max,
        recovery_probe_time,
    };
    (tls_config, threads)
}