static void ProcessMessage()

in src/utils/d2c_messaging/src/d2c_messaging.c [351:445]


static void ProcessMessage(ADUC_D2C_Message_Processing_Context* message_processing_context)
{
    bool shouldSend = false;
    time_t now = GetTimeSinceEpochInSeconds();

    if (message_processing_context == NULL)
    {
        Log_Error("context is NULL");
        return;
    }

    if (!message_processing_context->initialized)
    {
        Log_Warn("Message processing context (0x%x) is not initialized.", message_processing_context);
        return;
    }

    pthread_mutex_lock(&s_pendingMessageStoreMutex);
    pthread_mutex_lock(&message_processing_context->mutex);

    if (s_pendingMessageStore[message_processing_context->type].content != NULL)
    {
        if (message_processing_context->message.content != NULL)
        {
            if (message_processing_context->message.status == ADUC_D2C_Message_Status_Waiting_For_Response)
            {
                // Let's wait to see what the response is.
                goto done;
            }

            // Discard old message.
            Log_Info(
                "New D2C message content (t:%d, content:0x%x).",
                message_processing_context->type,
                s_pendingMessageStore[message_processing_context->type].content);
            OnMessageProcessingCompleted(&message_processing_context->message, ADUC_D2C_Message_Status_Replaced);
        }

        // Use new message
        memset(&message_processing_context->message, 0, sizeof(message_processing_context->message));
        message_processing_context->message = s_pendingMessageStore[message_processing_context->type];
        message_processing_context->message.attempts = 0;
        message_processing_context->retries = 0;
        message_processing_context->nextRetryTimeStampEpoch = now;

        // Empty pending message store.
        memset(&s_pendingMessageStore[message_processing_context->type], 0, sizeof(ADUC_D2C_Message));
        shouldSend = message_processing_context->message.content != NULL;

        SetMessageStatus(&message_processing_context->message, ADUC_D2C_Message_Status_In_Progress);
    }
    else if (
        (message_processing_context->message.content != NULL)
        && (message_processing_context->message.status == ADUC_D2C_Message_Status_In_Progress)
        && (now >= message_processing_context->nextRetryTimeStampEpoch))
    {
        shouldSend = true;
    }

    if (shouldSend)
    {
        if (message_processing_context->transportFunc == NULL)
        {
            Log_Error(
                "Cannot send message. Transport function is NULL. Will retry in the next %d seconds. (t:%d)",
                FATAL_ERROR_WAIT_TIME_SEC,
                message_processing_context->type);
            message_processing_context->nextRetryTimeStampEpoch += FATAL_ERROR_WAIT_TIME_SEC;
        }
        else
        {
            message_processing_context->message.attempts++;
            Log_Debug(
                "Sending D2C message (t:%d, retries:%d).",
                message_processing_context->type,
                message_processing_context->retries);
            if (message_processing_context->transportFunc(
                    message_processing_context->message.cloudServiceHandle,
                    message_processing_context,
                    DefaultIoTHubSendReportedStateCompletedCallback)
                != 0)
            {
                message_processing_context->nextRetryTimeStampEpoch += FATAL_ERROR_WAIT_TIME_SEC;
                Log_Error(
                    "Failed to send message. Will retry in the next %d seconds. (t:%d)",
                    FATAL_ERROR_WAIT_TIME_SEC,
                    message_processing_context->type);
            }
        }
    }

done:
    pthread_mutex_unlock(&message_processing_context->mutex);
    pthread_mutex_unlock(&s_pendingMessageStoreMutex);
}