in rust/azure_iot_operations_mqtt/src/rumqttc_adapter.rs [364:457]
fn try_from(value: MqttConnectionSettings) -> Result<Self, Self::Error> {
// Client ID, Host Name, TCP Port
let mut mqtt_options =
rumqttc::v5::MqttOptions::new(value.client_id.clone(), value.hostname, value.tcp_port);
// Keep Alive
mqtt_options.set_keep_alive(value.keep_alive);
// Receive Maximum
mqtt_options.set_receive_maximum(Some(value.receive_max));
// Max Packet Size
// NOTE: due to a bug in rumqttc, we need to set None to u32::MAX, since rumqttc overrides
// None values with an arbitrary default that can't be changed. This may or may not be
// exactly the same thing, but it is in most circumstances.
mqtt_options.set_max_packet_size(value.receive_packet_size_max.or(Some(u32::MAX)));
// Session Expiry
match value.session_expiry.as_secs().try_into() {
Ok(se) => {
// validate this is >= 5 seconds otherwise rumqttc will panic
if se < 5 {
return Err(ConnectionSettingsAdapterError {
msg: "require > 5 seconds".to_string(),
field: ConnectionSettingsField::SessionExpiry(value.session_expiry),
source: None,
});
}
mqtt_options.set_session_expiry_interval(Some(se));
}
Err(e) => {
return Err(ConnectionSettingsAdapterError {
msg: "cannot convert to u32".to_string(),
field: ConnectionSettingsField::SessionExpiry(value.session_expiry),
source: Some(Box::new(e)),
});
}
};
// Connection Timeout
mqtt_options.set_connection_timeout(value.connection_timeout.as_secs());
// Clean Start
mqtt_options.set_clean_start(value.clean_start);
// Username, Password, Password File
if let Some(username) = value.username {
let password = {
if let Some(password_file) = value.password_file {
match fs::read_to_string(&password_file) {
Ok(password) => password,
Err(e) => {
return Err(ConnectionSettingsAdapterError {
msg: "cannot read password file".to_string(),
field: ConnectionSettingsField::PasswordFile(password_file),
source: Some(Box::new(e)),
});
}
}
} else {
value.password.unwrap_or_default()
}
};
mqtt_options.set_credentials(username, password);
}
// Use TLS, CA File, CA Require Revocation Check, Cert File, Key File, Key File Password
if value.use_tls {
let transport = tls_config(
value.ca_file,
value.cert_file,
value.key_file,
value.key_password_file,
)
.map_err(|e| ConnectionSettingsAdapterError {
msg: "tls config error".to_string(),
field: ConnectionSettingsField::UseTls(true),
source: Some(Box::new(TlsError {
msg: e.to_string(),
source: Some(e),
})),
})?;
mqtt_options.set_transport(transport);
}
// SAT Auth File
if let Some(sat_file) = value.sat_file {
mqtt_options.set_authentication_method(Some("K8S-SAT".to_string()));
let sat_auth =
fs::read(sat_file.clone()).map_err(|e| ConnectionSettingsAdapterError {
msg: "cannot read sat auth file".to_string(),
field: ConnectionSettingsField::SatAuthFile(sat_file),
source: Some(Box::new(e)),
})?;
mqtt_options.set_authentication_data(Some(sat_auth.into()));
}
// NOTE: MqttOptions has a field called "request_channel_capacity" which currently does nothing.
// We do not set it.
Ok(mqtt_options)
}