AzureEventGrid/mqtt_connection.c (514 lines of code) (raw):
/* Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License. */
#include <stdio.h>
#include <stdlib.h>
#include <applibs/networking.h>
#include <applibs/application.h>
#include <tlsutils/deviceauth.h>
#include "exitcodes.h"
#include "eventgrid_config.h"
#include "eventloop_timer_utilities.h"
#include "mqtt_connection.h"
static char deviceId[DEVICE_ID_BUFFER_SIZE]; // Device ID is 128 bytes
static const char *deviceCertPath = NULL;
// Function to run the next time an IO event occurs.
static ExitCode (*nextHandler)(void);
static ExitCode HandleTlsHandshake(void);
static void HandleMqttConnection(void);
static void HandleSockEvent(EventLoop *el, int fd, EventLoop_IoEvents events, void *context);
static void ClientRefresherHandler(EventLoop *el, int fd, EventLoop_IoEvents events, void *context);
static ExitCode HandleWolfsslSetup(void);
static void MqttPingHandler(EventLoopTimer* eventLoopTimer);
static void MqttReconnectHandler(EventLoopTimer* eventLoopTimer);
static void MqttSetSubscriptions(const char *topic, size_t topicSize);
static void ReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr);
static void StartOneShotTimer(EventLoopTimer *timer, const struct timespec *delay);
static ExitCode LoadDeviceCertPathAndDeviceID(WOLFSSL_CTX *ctx);
static ExitCode FormatTopic(const char *topic, size_t topicSize, char *formattedTopicBuffer,
size_t formattedTopicBufferSize);
static bool IsNetworkReady(void);
static void FreeResources(void);
static ExitCode_CallbackType failureCallbackFunction = NULL;
static EventLoopTimer *mqttReconnectTimer = NULL;
static EventLoopTimer *mqttPingTimer = NULL;
static WOLFSSL_CTX *wolfSslCtx = NULL;
static WOLFSSL *wolfSslSession = NULL;
static bool wolfSslInitialized = false;
static int sockFd = -1;
static EventRegistration *sockReg = NULL;
static Mqtt_Reconnect_State *mqttReconnectStatePtr = NULL;
static bool isMqttConnected = false;
static char formattedPublishTopicBuffer[TOPIC_BUFFER_SIZE] = {0};
static char formattedSubscribeTopicBuffer[TOPIC_BUFFER_SIZE] = {0};
static EventLoop *eventLoopRef = NULL;
static MQTT_Context *mqttClientContext = NULL;
static struct mqtt_client mqttClient;
/// <summary>
/// Function to check if networking is ready.
/// </summary>
/// <param name=""></param>
/// <returns></returns>
bool IsNetworkReady(void)
{
bool isNetworkReady = false;
if (Networking_IsNetworkingReady(&isNetworkReady) != 0) {
Log_Debug("ERROR: Networking_IsNetworkingReady: %d (%s)\n", errno, strerror(errno));
failureCallbackFunction(ExitCode_IsNetworkingReady_Failed);
}
return isNetworkReady;
}
static void StartOneShotTimer(EventLoopTimer *timer, const struct timespec *delay)
{
if (SetEventLoopTimerOneShot(timer, delay) == -1) {
Log_Debug("ERROR: Failed to start provisioning timeout timer: %s (%d)\n", strerror(errno),
errno);
failureCallbackFunction(ExitCode_Reconnect_CreateTimer);
return;
}
}
void DisconnectMqtt(void) {
mqtt_disconnect(&mqttClient);
isMqttConnected = false;
FreeResources();
}
static void MqttPingHandler(EventLoopTimer *eventLoopTimer)
{
if (ConsumeEventLoopTimerEvent(eventLoopTimer) != 0) {
failureCallbackFunction(ExitCode_MqttPingTimer_Consume);
return;
}
if (isMqttConnected) {
mqtt_ping(&mqttClient);
}
}
const char *GetPublishTopicName(void)
{
return formattedPublishTopicBuffer;
}
/// <summary>
/// Publish message to Azure Event Grid
/// </summary>
/// <param name="data">Message to publish</param>
/// <param name="data_length">Length of message to publish</param>
/// <param name="topic">Topic to publish the message on</param>
void SendTelemetry(const void *data, size_t data_length, const char *topic)
{
if (!IsNetworkReady()) {
Log_Debug("Network is not ready. Cannot send telemetry.\n");
return;
}
if (!isMqttConnected) {
Log_Debug("Not connected to Azure Event Grid. Not sending telemetry.\n");
return;
}
if (!topic || !strlen(topic)) {
Log_Debug("Publish topic is null or empty. Not sending telemetry.\n");
failureCallbackFunction(ExitCode_SendTelemetry_NullTopic);
}
mqtt_mq_clean(&mqttClient.mq);
if (mqttClient.mq.curr_sz >= data_length) {
mqtt_publish(&mqttClient, topic, data, data_length, MQTT_MESSAGE_QOS);
}
mqtt_sync(&mqttClient);
}
/// <summary>
/// Function to replace "${client.authenticationName}" keyword (if it is present) with the actual
/// device ID.
/// </summary>
/// <param name="topic">Topic to format</param>
/// <param name="formattedTopicBuffer">Buffer to hold the topic after formatting</param>
static ExitCode FormatTopic(const char *topic, size_t topicSize, char *formattedTopicBuffer,
size_t formattedTopicBufferSize)
{
char* authName = NULL;
if (!topic || !topicSize || !strnlen(topic, topicSize)) {
Log_Debug("FormatTopic: Topic is null or empty.\n");
return ExitCode_FormatTopic_NullTopic;
}
if (!formattedTopicBuffer) {
Log_Debug("FormatTopic: formattedTopicBuffer is null.\n");
return ExitCode_FormatTopic_NullFormattedTopic;
}
memset(formattedTopicBuffer, 0, formattedTopicBufferSize);
authName = strstr(topic, AUTHENTICATION_NAME_KEYWORD);
// Keyword is not present
if (authName == NULL) {
memcpy(formattedTopicBuffer, topic, strlen(topic));
return ExitCode_Success;
}
// Keyword is present. Need to replace keyword with device ID.
if (deviceId == NULL) {
Log_Debug("FormatTopic: Device ID is null.\n");
return ExitCode_FormatTopic_DeviceID;
}
size_t authNameKeywordStartPos = (size_t)(authName - topic);
strncpy(formattedTopicBuffer, topic, authNameKeywordStartPos);
strncat(formattedTopicBuffer, deviceId, DEVICE_ID_BUFFER_SIZE);
size_t remainingTopicLen =
strnlen(topic, topicSize) - authNameKeywordStartPos -
strlen(AUTHENTICATION_NAME_KEYWORD);
if (remainingTopicLen > 0) {
const char *remainingTopicStartOffset =
topic + authNameKeywordStartPos + strlen(AUTHENTICATION_NAME_KEYWORD);
strncat(formattedTopicBuffer, remainingTopicStartOffset, remainingTopicLen);
}
// Check that formatted topic length does not exceed the size of the buffer.
if (strnlen(formattedTopicBuffer, formattedTopicBufferSize) >= formattedTopicBufferSize) {
Log_Debug("ERROR: Formatted topic for '%s' is longer than the buffer size for the formatted topic.\n",
topic);
return ExitCode_FormatTopic_Size;
}
return ExitCode_Success;
}
/// <summary>
/// Check status of connection to MQTT Broker
/// </summary>
static void MqttReconnectHandler(EventLoopTimer *eventLoopTimer)
{
if (ConsumeEventLoopTimerEvent(eventLoopTimer) != 0) {
failureCallbackFunction(ExitCode_ReconnectTimer_Consume);
return;
}
ReconnectClient(&mqttClient, &mqttClient.reconnect_state);
return;
}
static void ClientRefresherHandler(EventLoop* el, int fd, EventLoop_IoEvents events, void* context) {
if (IsNetworkReady()) {
mqtt_sync(&mqttClient);
} else {
const struct timespec reconnectTime = {.tv_sec = 1, .tv_nsec = 0};
StartOneShotTimer(mqttReconnectTimer, &reconnectTime);
}
}
/// <summary>
/// This function is called from the event loop when a read or write event occurs
/// on the underlying socket. It calls the function whose address is in nextHandler.
/// </summary>
static void HandleSockEvent(EventLoop *el, int fd, EventLoop_IoEvents events, void *context)
{
int retExitCode = nextHandler();
if (retExitCode != ExitCode_Success) {
failureCallbackFunction(retExitCode);
}
}
/// <summary>
/// Open an AF_INET socket and starts an asynchronous connection
/// to the server's HTTPS port.
/// </summary>
static ExitCode ConnectRawSocketToServer(const char *hostname)
{
struct addrinfo hints;
int rc = 0;
struct sockaddr_in addr;
struct addrinfo *result = NULL;
if (hostname == NULL) {
Log_Debug("ConnectRawSocketToServer: Hostname is null.\n");
return ExitCode_ConnectRaw_InvalidHostName;
}
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
rc = getaddrinfo(hostname, NULL, &hints, &result);
if (rc == 0) {
struct addrinfo *res = result;
if (result == NULL) {
return ExitCode_ConnectRaw_GetAddrInfo_Result;
}
while (res) {
if (res->ai_family == AF_INET) {
break;
}
res = res->ai_next;
}
if (res) {
addr.sin_port = htons((uint16_t)atoi(EVENT_GRID_MQTT_PORT));
addr.sin_family = AF_INET;
addr.sin_addr = ((struct sockaddr_in *)(res->ai_addr))->sin_addr;
} else {
rc = -1;
}
freeaddrinfo(result);
}
if (rc != 0) {
return ExitCode_ConnectRaw_GetAddrInfo;
}
sockFd = socket(addr.sin_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (sockFd == -1) {
return ExitCode_ConnectRaw_Socket;
}
sockReg = EventLoop_RegisterIo(eventLoopRef, sockFd, EventLoop_Output, HandleSockEvent,
/* context */ NULL);
if (sockReg == NULL) {
return ExitCode_ConnectRaw_EventReg;
}
int r = connect(sockFd, (struct sockaddr *)&addr, sizeof(addr));
if (r != 0 && errno != EINPROGRESS) {
return ExitCode_ConnectRaw_Connect;
}
nextHandler = HandleWolfsslSetup;
return ExitCode_Success;
}
/// <summary>
/// Called from the event loop when socket connection has completed,
/// successfully or otherwise. If the connection was successful, then
/// uses wolfSSL to start the SSL handshake. Otherwise, set exitCode to
/// the appropriate value.
/// </summary>
static ExitCode HandleWolfsslSetup(void)
{
int ret;
// Check whether the connection succeeded.
int error;
socklen_t errSize = sizeof(error);
int r = getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &error, &errSize);
if (!(r == 0 && error == 0)) {
Log_Debug("ERROR: Socket connection failed\n");
return ExitCode_HandleWolfSslSetup_Failed;
}
// Connection was made successfully, so allocate wolfSSL session and context.
r = wolfSSL_Init();
if (r != WOLFSSL_SUCCESS) {
Log_Debug("ERROR: wolfSSL_init failed\n");
return ExitCode_HandleWolfSslSetup_Init;
}
wolfSslInitialized = true;
WOLFSSL_METHOD *wolfSslMethod = wolfTLSv1_3_client_method();
if (wolfSslMethod == NULL) {
Log_Debug("ERROR: failed to create WOLFSSL METHOD\n");
return ExitCode_HandleWolfSslSetup_Method;
}
wolfSslCtx = wolfSSL_CTX_new(wolfSslMethod);
if (wolfSslCtx == NULL) {
Log_Debug("ERROR: failed to create WOLFSSL_CTX\n");
return ExitCode_HandleWolfSslSetup_Context;
}
if (deviceCertPath == NULL) {
Log_Debug("HandleWolfsslSetup: Device cert path is null.\n");
return ExitCode_HandleWolfSslSetup_DeviceCertPath;
}
// Use device certificate for authentication
if ((ret = wolfSSL_CTX_use_certificate_file(wolfSslCtx, deviceCertPath,
WOLFSSL_FILETYPE_PEM)) != WOLFSSL_SUCCESS) {
Log_Debug("ERROR: failed to use device certificate\n");
return ExitCode_HandleWolfSslSetup_CertPath;
}
// Specify the root certificate which is used to validate the Azure Event Grid.
char *certPathAbs =
Storage_GetAbsolutePathInImagePackage(mqttClientContext->ca_cert);
if (certPathAbs == NULL) {
Log_Debug("ERROR: failed to get path to CA certificate\n");
return ExitCode_HandleWolfSslSetup_CertPath;
}
if ((ret = wolfSSL_CTX_load_verify_locations(wolfSslCtx, certPathAbs, NULL)) != WOLFSSL_SUCCESS) {
Log_Debug("ERROR: failed to load ca certificate\n");
free(certPathAbs);
certPathAbs = NULL;
return ExitCode_HandleWolfSslSetup_VerifyLocations;
}
free(certPathAbs);
certPathAbs = NULL;
// Use Server Name Identification (SNI) as Azure Event Grid uses SNI.
ret = wolfSSL_CTX_UseSNI(wolfSslCtx, WOLFSSL_SNI_HOST_NAME, mqttClientContext->hostname,
(short unsigned int)strlen(mqttClientContext->hostname));
if (ret != WOLFSSL_SUCCESS)
{
// sni usage failed
Log_Debug("SNI usage failed\n");
return ExitCode_HandleWolfSslSetup_UseSni;
}
wolfSslSession = wolfSSL_new(wolfSslCtx);
if (wolfSslSession == NULL) {
Log_Debug("ERROR: Failed to open new WOlfSsl session\n");
return ExitCode_HandleWolfSslSetup_Session;
}
// Check domain name of peer certificate.
r = wolfSSL_check_domain_name(wolfSslSession, mqttClientContext->hostname);
if (r != WOLFSSL_SUCCESS) {
Log_Debug("ERROR: wolfSSL_check_domain_name %d\n", r);
return ExitCode_HandleWolfSslSetup_CheckDomainName;
}
// Associate socket with wolfSSL session.
r = wolfSSL_set_fd(wolfSslSession, sockFd);
if (r != WOLFSSL_SUCCESS) {
Log_Debug("ERROR: wolfSSL_set_fd %d\n", r);
return ExitCode_HandleWolfSslSetup_SetFd;
}
// Perform TLS handshake.
// Asynchronous handshakes require repeated calls to wolfSSL_connect, so jump to the
// handler to avoid repeating code.
ret = HandleTlsHandshake();
if (ret != ExitCode_Success) {
return ret;
}
return ExitCode_Success;
}
/// <summary>
/// This function is called to start the TLS handshake. When an IO event occurs, the event loop
/// calls this function again to check whether the handshake has completed.
/// If the handshake completes successfully, this function then initiates the MQTT.
/// connection. If a fatal error occurs, sets exitCode to the appropriate value.
/// </summary>
static ExitCode HandleTlsHandshake(void)
{
int r = EventLoop_ModifyIoEvents(eventLoopRef, sockReg, EventLoop_Input | EventLoop_Output);
if (r != 0) {
return ExitCode_TlsHandshake_ModifyEvents;
}
r = wolfSSL_connect(wolfSslSession);
if (r != WOLFSSL_SUCCESS) {
// If the handshake is in progress, exit to the event loop.
const int uniqueError = wolfSSL_get_error(wolfSslSession, r);
if (uniqueError == WOLFSSL_ERROR_WANT_READ || uniqueError == WOLFSSL_ERROR_WANT_WRITE) {
nextHandler = HandleTlsHandshake;
return ExitCode_Success;
}
// Unexpected error, so terminate.
Log_Debug("ERROR: wolfSSL_connect %d\n", uniqueError);
return ExitCode_TlsHandshake_UnexpectedError;
}
// Handshake completed, now handle mqtt connection.
HandleMqttConnection();
return ExitCode_Success;
}
/// <summary>
/// This function is called after the TLS handshake is successful, to initiate an MQTT
/// connection and set the subscriptions.
/// </summary>
static void HandleMqttConnection(void)
{
if (wolfSslSession == NULL) {
const struct timespec reconnectTime = {.tv_sec = 2, .tv_nsec = 0};
StartOneShotTimer(mqttReconnectTimer, &reconnectTime);
Log_Debug("Failed to open socket: ");
return;
}
int r = EventLoop_UnregisterIo(eventLoopRef, sockReg);
if (r != 0) {
failureCallbackFunction(ExitCode_MqttConnection_UnregisterIO);
return;
}
sockReg = EventLoop_RegisterIo(eventLoopRef, sockFd, EventLoop_Input, ClientRefresherHandler, NULL);
if (sockReg == NULL) {
failureCallbackFunction(ExitCode_MqttConnection_RegisterIO);
return;
}
// Reinitialize the client.
mqtt_reinit(&mqttClient, wolfSslSession, mqttReconnectStatePtr->sendbuf,
mqttReconnectStatePtr->sendbufsz, mqttReconnectStatePtr->recvbuf,
mqttReconnectStatePtr->recvbufsz);
// Send connection request to the broker.
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
mqtt_connect(&mqttClient, deviceId, NULL, NULL, 0, deviceId, NULL, connect_flags, 30);
// Subscribe to the desired topic.
MqttSetSubscriptions(formattedSubscribeTopicBuffer, sizeof(formattedSubscribeTopicBuffer));
}
/// <summary>
/// Function to free all resources
/// </summary>
static void FreeResources(void)
{
if (wolfSslSession != NULL) {
wolfSSL_free(wolfSslSession);
wolfSslSession = NULL;
}
if (wolfSslCtx != NULL) {
wolfSSL_CTX_free(wolfSslCtx);
wolfSslCtx = NULL;
}
if (wolfSslInitialized) {
wolfSSL_Cleanup();
wolfSslInitialized = false;
}
if (sockFd != -1) {
close(sockFd);
sockFd = -1;
}
if (sockReg != NULL) {
EventLoop_UnregisterIo(eventLoopRef, sockReg);
sockReg = NULL;
}
}
/// <summary>
/// Function to dispose the timers created for the MQTT connection.
/// </summary>
void DisposeMqttTimers(void)
{
DisposeEventLoopTimer(mqttReconnectTimer);
DisposeEventLoopTimer(mqttPingTimer);
}
/// <summary>
/// Function to load the device certificate and device ID. Device ID is needed as the username
/// for the MQTT connection. Device certificate is needed to authenticate the device to the
/// Event Grid (MQTT broker).
/// </summary>
static ExitCode LoadDeviceCertPathAndDeviceID(WOLFSSL_CTX *ctx)
{
WOLFSSL_X509 *deviceCert = NULL;
bool isDeviceAuthReady = false;
char localDeviceId[134] = {0};
if (Application_IsDeviceAuthReady(&isDeviceAuthReady) != 0) {
Log_Debug("ERROR: Device authentication could not be checked: %d (%s)\n", errno,
strerror(errno));
goto cleanupLabel;
}
if (!isDeviceAuthReady) {
Log_Debug("ERROR: Device has not authenticated: %d (%s)\n", errno, strerror(errno));
goto cleanupLabel;
}
deviceCertPath = DeviceAuth_GetCertificatePath();
if (deviceCertPath == NULL) {
Log_Debug("ERROR: DeviceAuth_GetCertificatePath: %d (%s)\n", errno, strerror(errno));
goto cleanupLabel;
}
deviceCert = wolfSSL_X509_load_certificate_file(deviceCertPath, WOLFSSL_FILETYPE_PEM);
if (deviceCert == NULL) {
Log_Debug("wolfSSL_X509_load_certificate_file error %d (%s)\n", errno, strerror(errno));
goto cleanupLabel;
}
WOLFSSL_X509_NAME *subjectName = wolfSSL_X509_get_subject_name(deviceCert);
if (subjectName == NULL) {
Log_Debug("ERROR: invalid subject name: %d (%s)\n", errno, strerror(errno));
goto cleanupLabel;
}
if (wolfSSL_X509_NAME_oneline(subjectName, (char *)&localDeviceId, sizeof(localDeviceId)) < 0) {
Log_Debug("ERROR: Failed to get device id: %d (%s)\n", errno, strerror(errno));
goto cleanupLabel;
}
snprintf(deviceId, DEVICE_ID_BUFFER_SIZE, "%s", localDeviceId + 4);
if (deviceCert != NULL) {
wolfSSL_X509_free(deviceCert);
deviceCert = NULL;
}
return ExitCode_Success;
cleanupLabel:
if (deviceCert != NULL) {
wolfSSL_X509_free(deviceCert);
deviceCert = NULL;
}
return ExitCode_LoadDeviceCertificate;
}
/// <summary>
/// Creates an MQTT client connection to Event Grid and sets the subscriptions.
/// APIs are called in the following order:
/// - ConnectRawSocketToServer
/// - HandleWolfsslSetup
/// - HandleTlsHandshake
/// - HandleMqttConnection
/// - MqttSetSubscriptions
/// This function is also called as a callback from MQTT-C library to connect/reconnect
/// to Event Grid.
/// </summary>
static void ReconnectClient(struct mqtt_client *client, void **reconnect_state_vptr)
{
int retExitCode = -1;
mqttReconnectStatePtr = *((Mqtt_Reconnect_State **)reconnect_state_vptr);
if (!IsNetworkReady()) {
const struct timespec reconnectTime = {.tv_sec = 2, .tv_nsec = 0};
StartOneShotTimer(mqttReconnectTimer, &reconnectTime);
Log_Debug("Network not ready.\n");
return;
}
/* Perform error handling here. */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
Log_Debug("reconnect_client: called while client was in error state \"%s\"\n",
mqtt_error_str(client->error)
);
}
FreeResources();
retExitCode = ConnectRawSocketToServer(mqttClientContext->hostname);
if (retExitCode != ExitCode_Success) {
Log_Debug("ERROR: ConnectRawSocketToServer: exitcode= %d, %d (%s)\n", retExitCode, errno,
strerror(errno));
failureCallbackFunction(retExitCode);
return;
}
return;
}
static void MqttSetSubscriptions(const char* topic, size_t topicSize)
{
isMqttConnected = false;
if (!topic || !topicSize || !strnlen(topic, topicSize)) {
Log_Debug("Subscribe topic is null or empty.\n");
failureCallbackFunction(ExitCode_SetSubscription_NullTopic);
}
if (mqttClient.error == MQTT_OK) {
mqtt_subscribe(&mqttClient, topic, MQTT_MESSAGE_QOS);
isMqttConnected = true;
Log_Debug("Connected to MQTT Broker\n");
}
}
/// <summary>
/// Init MQTT connection and subscribe to desired topics
/// </summary>
ExitCode InitializeMqtt(EventLoop *eventLoop,
void (*publish_callback)(void **unused, struct mqtt_response_publish *published),
ExitCode_CallbackType failureCallback,
MQTT_Context *mqttContext)
{
static Mqtt_Reconnect_State reconnect_state = {.hostname = NULL,
.port = NULL,
.recvbuf = NULL,
.recvbufsz = 0,
.sendbuf = NULL,
.sendbufsz = 0,
.subtopic = NULL};
eventLoopRef = eventLoop;
failureCallbackFunction = failureCallback;
mqttClientContext = mqttContext;
int ret = LoadDeviceCertPathAndDeviceID(wolfSslCtx);
if (ret != ExitCode_Success) {
return ret;
}
// Format the publish and subscribe topic spaces to replace "${client.authenticationName}"
// with the device ID.
ret = FormatTopic(EVENT_GRID_PUBLISH_TOPIC, strlen(EVENT_GRID_PUBLISH_TOPIC),
formattedPublishTopicBuffer,
sizeof(formattedPublishTopicBuffer));
if (ret != ExitCode_Success) {
return ret;
}
FormatTopic(EVENT_GRID_SUBSCRIBE_TOPIC, strlen(EVENT_GRID_PUBLISH_TOPIC),
formattedSubscribeTopicBuffer,
sizeof(formattedSubscribeTopicBuffer));
if (ret != ExitCode_Success) {
return ret;
}
// Build the reconnect_state structure which will be passed to reconnect
reconnect_state.hostname = mqttClientContext->hostname;
reconnect_state.port = mqttClientContext->port;
reconnect_state.subtopic = formattedSubscribeTopicBuffer;
reconnect_state.sendbuf = mqttClientContext->sendbuf;
reconnect_state.sendbufsz = sizeof(mqttClientContext->sendbuf);
reconnect_state.recvbuf = mqttClientContext->recvbuf;
reconnect_state.recvbufsz = sizeof(mqttClientContext->recvbuf);
mqtt_init_reconnect(&mqttClient, ReconnectClient, &reconnect_state, publish_callback);
return ExitCode_Success;
}
void ConnectMqtt(void)
{
ReconnectClient(&mqttClient, &mqttClient.reconnect_state);
}
void CreateMqttTimers(void)
{
mqttReconnectTimer = CreateEventLoopDisarmedTimer(eventLoopRef, MqttReconnectHandler);
if (mqttReconnectTimer == NULL) {
Log_Debug("ERROR: Failed to create provisioning timeout timer: %s (%d)\n",
strerror(errno), errno);
failureCallbackFunction(ExitCode_Init_MqttPingTimer);
}
struct timespec pingTimerPeriod = {.tv_sec = 30, .tv_nsec = 0};
mqttPingTimer = CreateEventLoopPeriodicTimer(eventLoopRef, &MqttPingHandler, &pingTimerPeriod);
if (mqttPingTimer == NULL) {
Log_Debug("ERROR: Failed to create provisioning timeout timer: %s (%d)\n", strerror(errno),
errno);
failureCallbackFunction(ExitCode_Init_ReconnectTimer);
}
return;
}