in edge-modules/api-proxy-module/rust-sdk/azure-iot-mqtt/src/io.rs [83:278]
fn connect(&mut self) -> Self::Future {
use futures_util::FutureExt;
#[allow(clippy::identity_op)]
const DEFAULT_MAX_TOKEN_VALID_DURATION: std::time::Duration =
std::time::Duration::from_secs(1 * 60 * 60);
let iothub_hostname = self.iothub_hostname.clone();
let timeout = self.timeout;
let extra = self.extra.clone();
let authentication = match &self.authentication {
crate::Authentication::SasKey {
device_id,
key,
max_token_valid_duration,
server_root_certificate,
} => match prepare_sas_token_request(
&*iothub_hostname,
device_id,
None,
*max_token_valid_duration,
) {
Ok((signature_data, make_sas_token)) => {
use hmac::Mac;
let mut mac = hmac::Hmac::<sha2::Sha256>::new_from_slice(key)
.expect("HMAC can have invalid key length");
mac.update(signature_data.as_bytes());
let signature = mac.finalize();
let signature = base64::encode(signature.into_bytes());
let sas_token = make_sas_token(&signature);
futures_util::future::Either::Left(futures_util::future::ok((
Some(sas_token),
None,
server_root_certificate.clone(),
)))
}
Err(err) => futures_util::future::Either::Left(futures_util::future::err(err)),
},
crate::Authentication::SasToken {
token,
server_root_certificate,
} => futures_util::future::Either::Left(futures_util::future::ok((
Some(token.clone()),
None,
server_root_certificate.clone(),
))),
crate::Authentication::Certificate {
der,
password,
server_root_certificate,
} => match native_tls::Identity::from_pkcs12(der, password) {
Ok(identity) => futures_util::future::Either::Left(futures_util::future::ok((
None,
Some(identity),
server_root_certificate.clone(),
))),
Err(err) => futures_util::future::Either::Left(futures_util::future::err(
std::io::Error::new(
std::io::ErrorKind::Other,
format!("could not parse client certificate: {}", err),
),
)),
},
crate::Authentication::IotEdge {
device_id,
module_id,
generation_id,
iothub_hostname,
workload_url,
} => match crate::iotedge_client::Client::new(workload_url) {
Ok(iotedge_client) => {
match prepare_sas_token_request(
iothub_hostname,
device_id,
None,
DEFAULT_MAX_TOKEN_VALID_DURATION,
) {
Ok((signature_data, make_sas_token)) => {
let signature = iotedge_client.hmac_sha256(
module_id,
generation_id,
&signature_data,
);
let server_root_certificate =
iotedge_client.get_server_root_certificate();
futures_util::future::Either::Right(
futures_util::future::try_join(signature, server_root_certificate)
.map(move |result| match result {
Ok((signature, server_root_certificate)) => {
let sas_token = make_sas_token(&signature);
Ok((Some(sas_token), None, server_root_certificate))
}
Err(err) => {
Err(std::io::Error::new(std::io::ErrorKind::Other, err))
}
}),
)
}
Err(err) => {
futures_util::future::Either::Left(futures_util::future::err(err))
}
}
}
Err(err) => futures_util::future::Either::Left(futures_util::future::err(
std::io::Error::new(
std::io::ErrorKind::Other,
format!("could not initialize iotedge client: {}", err),
),
)),
},
};
let iothub_host = self.iothub_host;
Box::pin(async move {
let stream = async {
let stream =
tokio::time::timeout(timeout, tokio::net::TcpStream::connect(&iothub_host))
.await
.map_err(|_| std::io::ErrorKind::TimedOut)?;
Ok(stream)
};
let ((password, identity, server_root_certificate), stream) =
futures_util::future::try_join(authentication, stream).await?;
let stream = stream?;
stream.set_nodelay(true)?;
let mut stream = tokio_io_timeout::TimeoutStream::new(stream);
stream.set_read_timeout(Some(timeout));
let mut tls_connector_builder = native_tls::TlsConnector::builder();
if let Some(identity) = identity {
tls_connector_builder.identity(identity);
}
for certificate in server_root_certificate {
tls_connector_builder.add_root_certificate(certificate);
}
let connector = tls_connector_builder.build().map_err(|err| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("could not create TLS connector: {}", err),
)
})?;
let connector: tokio_native_tls::TlsConnector = connector.into();
let stream = connector
.connect(&iothub_hostname, Box::pin(stream))
.await
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
match extra {
IoSourceExtra::Raw => Ok((Io::Raw(stream), password)),
IoSourceExtra::WebSocket { uri } => {
let request = http::Request::get(uri)
.header("sec-websocket-protocol", "mqtt")
.body(())
.expect("building client handshake request cannot fail");
let handshake = tungstenite::ClientHandshake::start(
TokioToStd {
tokio: stream,
cx: std::ptr::null_mut(),
},
request,
None,
)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
let stream = WsConnect::Handshake(handshake).await?;
Ok((
Io::WebSocket {
inner: stream,
pending_read: std::io::Cursor::new(vec![]),
},
password,
))
}
}
})
}