AzureEventGrid/main.c (148 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License. */ #include <applibs/log.h> #include <signal.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include "eventgrid_config.h" #include "eventloop_timer_utilities.h" #include "exitcodes.h" #include "mqtt_connection.h" #include "mqtt.h" #include "options.h" static volatile sig_atomic_t exitCode = ExitCode_Success; // Timer / polling static EventLoop *eventLoop = NULL; static EventLoopTimer *publishMessageTimer = NULL; static void PublishMessageTimerHandler(EventLoopTimer *eventLoopTimer); static void ExitCodeCallbackHandler(ExitCode ec); static void UpdateTelemetry(char *mqttMessageToPublish, size_t messageBufferSize); static void PublishCallback(void **unused, struct mqtt_response_publish *published); static bool BuildUtcDateTimeString(char *outputBuffer, size_t outputBufferSize, time_t t); static MQTT_MESSAGE mqtt_msg; static MQTT_Context mqttContext = {.hostname = NULL, .port = "8883", .publishTopic = EVENT_GRID_PUBLISH_TOPIC, .subscribeTopic = EVENT_GRID_SUBSCRIBE_TOPIC, .ca_cert = EVENT_GRID_CA_CERTIFICATE, .messageSize = 128, .messageQOS = MQTT_PUBLISH_QOS_1, .topicSize = 256 }; /// <summary> /// This function publishes a message to Azure Event Grid on configured topic /// </summary> static void PublishMessageTimerHandler(EventLoopTimer *eventLoopTimer) { if (ConsumeEventLoopTimerEvent(eventLoopTimer) != 0) { exitCode = ExitCode_PublishMessageTimer_Consume; return; } UpdateTelemetry(mqtt_msg.message, sizeof(mqtt_msg.message)); mqtt_msg.message_length = strnlen(mqtt_msg.message, sizeof(mqtt_msg.message)); SendTelemetry(mqtt_msg.message, mqtt_msg.message_length, GetPublishTopicName()); } /// <summary> /// This function is called when the device receives a new message from Azure Event Grid /// </summary> static void PublishCallback(void **unused, struct mqtt_response_publish *published) { char *message = NULL; // Print the message received message = (char *)malloc(published->application_message_size + 1); if (!message) { Log_Debug("Error: Cannot print received message. Memory allocation failed.\n"); return; } memset(message, 0x00, published->application_message_size + 1); memcpy(message, published->application_message, published->application_message_size); Log_Debug("Message Received: %s", message); free(message); } /// <summary> /// Signal handler for termination requests. This handler must be async-signal-safe. /// </summary> static void TerminationHandler(int signalNumber) { // Don't use Log_Debug here, as it is not guaranteed to be async-signal-safe. exitCode = ExitCode_TermHandler_SigTerm; } static void ExitCodeCallbackHandler(ExitCode ec) { exitCode = ec; } /// <summary> /// Add date and time and simulated temperature to the message that is to be published. /// </summary> static void UpdateTelemetry(char *mqttMessageToPublish, size_t messageBufferSize) { static float temperature = 50.f; char dateTimeBuffer[DATETIME_BUFFER_SIZE] = {0}; if (!mqttMessageToPublish || !messageBufferSize) { Log_Debug("Error: Publish message buffer is null or empty.\n"); return; } memset(mqttMessageToPublish, 0, messageBufferSize); time_t now; time(&now); if (now != -1) { BuildUtcDateTimeString(dateTimeBuffer, sizeof(dateTimeBuffer), now); } // Generate a simulated temperature. float delta = ((float)(rand() % 41)) / 20.0f - 1.0f; // between -1.0 and +1.0 temperature += delta; snprintf(mqttMessageToPublish, messageBufferSize, "%s: Temperature %f\n", dateTimeBuffer, temperature); } /// <summary> /// Helper function to build the UTC date time string. /// </summary> static bool BuildUtcDateTimeString(char *outputBuffer, size_t outputBufferSize, time_t t) { // Format string to create an ISO 8601 time. This corresponds to the DTDL datetime schema item. static const char *ISO8601Format = "%Y-%m-%dT%H:%M:%SZ"; bool result; struct tm *currentTimeTm; currentTimeTm = gmtime(&t); if (strftime(outputBuffer, outputBufferSize, ISO8601Format, currentTimeTm) == 0) { Log_Debug("ERROR: strftime: %s (%d)\n", errno, strerror(errno)); result = false; } else { result = true; } return result; } /// <summary> /// Initialize peripherals, device twins, direct methods, timers. /// </summary> static ExitCode InitPeripheralsAndHandlers(void) { struct sigaction action; memset(&action, 0, sizeof(struct sigaction)); action.sa_handler = TerminationHandler; sigaction(SIGTERM, &action, NULL); eventLoop = EventLoop_Create(); if (eventLoop == NULL) { Log_Debug("Could not create event loop.\n"); return ExitCode_Init_EventLoop; } // Publish telemetry message every one second. struct timespec publishMessagePeriod = {.tv_sec = 1, .tv_nsec = 0}; publishMessageTimer = CreateEventLoopPeriodicTimer(eventLoop, &PublishMessageTimerHandler, &publishMessagePeriod); if (publishMessageTimer == NULL) { return ExitCode_Init_PublishMessageTimer; } mqttContext.hostname = Options_GetAzureEventGridHostname(); int ret = InitializeMqtt(eventLoop, PublishCallback, ExitCodeCallbackHandler, &mqttContext); if (ret != ExitCode_Success) { return ret; } // Create timers needed for MQTT connection. CreateMqttTimers(); return ExitCode_Success; } /// <summary> /// Close peripherals and handlers. /// </summary> static void ClosePeripheralsAndHandlers(void) { DisconnectMqtt(); DisposeMqttTimers(); DisposeEventLoopTimer(publishMessageTimer); EventLoop_Close(eventLoop); } int main(int argc, char *argv[]) { Log_Debug("Azure Event Grid Application starting.\n"); exitCode = Options_ParseArgs(argc, argv); if (exitCode != ExitCode_Success) { return exitCode; } exitCode = InitPeripheralsAndHandlers(); if (exitCode == ExitCode_Success) { ConnectMqtt(); } // Main loop while (exitCode == ExitCode_Success) { EventLoop_Run_Result result = EventLoop_Run(eventLoop, -1, true); // Continue if interrupted by signal, e.g. due to breakpoint being set. if (result == EventLoop_Run_Failed && errno != EINTR) { exitCode = ExitCode_Main_EventLoopFail; } } ClosePeripheralsAndHandlers(); Log_Debug("Application exiting.\n"); return exitCode; }