fn connect()

in edge-modules/api-proxy-module/rust-sdk/azure-iot-mqtt/src/io.rs [83:278]


    fn connect(&mut self) -> Self::Future {
        use futures_util::FutureExt;

        #[allow(clippy::identity_op)]
        const DEFAULT_MAX_TOKEN_VALID_DURATION: std::time::Duration =
            std::time::Duration::from_secs(1 * 60 * 60);

        let iothub_hostname = self.iothub_hostname.clone();
        let timeout = self.timeout;
        let extra = self.extra.clone();

        let authentication = match &self.authentication {
            crate::Authentication::SasKey {
                device_id,
                key,
                max_token_valid_duration,
                server_root_certificate,
            } => match prepare_sas_token_request(
                &*iothub_hostname,
                device_id,
                None,
                *max_token_valid_duration,
            ) {
                Ok((signature_data, make_sas_token)) => {
                    use hmac::Mac;

                    let mut mac = hmac::Hmac::<sha2::Sha256>::new_from_slice(key)
                        .expect("HMAC can have invalid key length");
                    mac.update(signature_data.as_bytes());
                    let signature = mac.finalize();
                    let signature = base64::encode(signature.into_bytes());

                    let sas_token = make_sas_token(&signature);

                    futures_util::future::Either::Left(futures_util::future::ok((
                        Some(sas_token),
                        None,
                        server_root_certificate.clone(),
                    )))
                }

                Err(err) => futures_util::future::Either::Left(futures_util::future::err(err)),
            },

            crate::Authentication::SasToken {
                token,
                server_root_certificate,
            } => futures_util::future::Either::Left(futures_util::future::ok((
                Some(token.clone()),
                None,
                server_root_certificate.clone(),
            ))),

            crate::Authentication::Certificate {
                der,
                password,
                server_root_certificate,
            } => match native_tls::Identity::from_pkcs12(der, password) {
                Ok(identity) => futures_util::future::Either::Left(futures_util::future::ok((
                    None,
                    Some(identity),
                    server_root_certificate.clone(),
                ))),
                Err(err) => futures_util::future::Either::Left(futures_util::future::err(
                    std::io::Error::new(
                        std::io::ErrorKind::Other,
                        format!("could not parse client certificate: {}", err),
                    ),
                )),
            },

            crate::Authentication::IotEdge {
                device_id,
                module_id,
                generation_id,
                iothub_hostname,
                workload_url,
            } => match crate::iotedge_client::Client::new(workload_url) {
                Ok(iotedge_client) => {
                    match prepare_sas_token_request(
                        iothub_hostname,
                        device_id,
                        None,
                        DEFAULT_MAX_TOKEN_VALID_DURATION,
                    ) {
                        Ok((signature_data, make_sas_token)) => {
                            let signature = iotedge_client.hmac_sha256(
                                module_id,
                                generation_id,
                                &signature_data,
                            );

                            let server_root_certificate =
                                iotedge_client.get_server_root_certificate();

                            futures_util::future::Either::Right(
                                futures_util::future::try_join(signature, server_root_certificate)
                                    .map(move |result| match result {
                                        Ok((signature, server_root_certificate)) => {
                                            let sas_token = make_sas_token(&signature);
                                            Ok((Some(sas_token), None, server_root_certificate))
                                        }

                                        Err(err) => {
                                            Err(std::io::Error::new(std::io::ErrorKind::Other, err))
                                        }
                                    }),
                            )
                        }

                        Err(err) => {
                            futures_util::future::Either::Left(futures_util::future::err(err))
                        }
                    }
                }

                Err(err) => futures_util::future::Either::Left(futures_util::future::err(
                    std::io::Error::new(
                        std::io::ErrorKind::Other,
                        format!("could not initialize iotedge client: {}", err),
                    ),
                )),
            },
        };

        let iothub_host = self.iothub_host;

        Box::pin(async move {
            let stream = async {
                let stream =
                    tokio::time::timeout(timeout, tokio::net::TcpStream::connect(&iothub_host))
                        .await
                        .map_err(|_| std::io::ErrorKind::TimedOut)?;
                Ok(stream)
            };

            let ((password, identity, server_root_certificate), stream) =
                futures_util::future::try_join(authentication, stream).await?;

            let stream = stream?;
            stream.set_nodelay(true)?;

            let mut stream = tokio_io_timeout::TimeoutStream::new(stream);
            stream.set_read_timeout(Some(timeout));

            let mut tls_connector_builder = native_tls::TlsConnector::builder();
            if let Some(identity) = identity {
                tls_connector_builder.identity(identity);
            }
            for certificate in server_root_certificate {
                tls_connector_builder.add_root_certificate(certificate);
            }

            let connector = tls_connector_builder.build().map_err(|err| {
                std::io::Error::new(
                    std::io::ErrorKind::Other,
                    format!("could not create TLS connector: {}", err),
                )
            })?;
            let connector: tokio_native_tls::TlsConnector = connector.into();

            let stream = connector
                .connect(&iothub_hostname, Box::pin(stream))
                .await
                .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;

            match extra {
                IoSourceExtra::Raw => Ok((Io::Raw(stream), password)),

                IoSourceExtra::WebSocket { uri } => {
                    let request = http::Request::get(uri)
                        .header("sec-websocket-protocol", "mqtt")
                        .body(())
                        .expect("building client handshake request cannot fail");

                    let handshake = tungstenite::ClientHandshake::start(
                        TokioToStd {
                            tokio: stream,
                            cx: std::ptr::null_mut(),
                        },
                        request,
                        None,
                    )
                    .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
                    let stream = WsConnect::Handshake(handshake).await?;
                    Ok((
                        Io::WebSocket {
                            inner: stream,
                            pending_read: std::io::Cursor::new(vec![]),
                        },
                        password,
                    ))
                }
            }
        })
    }