in src/utils/d2c_messaging/src/d2c_messaging.c [351:445]
static void ProcessMessage(ADUC_D2C_Message_Processing_Context* message_processing_context)
{
bool shouldSend = false;
time_t now = GetTimeSinceEpochInSeconds();
if (message_processing_context == NULL)
{
Log_Error("context is NULL");
return;
}
if (!message_processing_context->initialized)
{
Log_Warn("Message processing context (0x%x) is not initialized.", message_processing_context);
return;
}
pthread_mutex_lock(&s_pendingMessageStoreMutex);
pthread_mutex_lock(&message_processing_context->mutex);
if (s_pendingMessageStore[message_processing_context->type].content != NULL)
{
if (message_processing_context->message.content != NULL)
{
if (message_processing_context->message.status == ADUC_D2C_Message_Status_Waiting_For_Response)
{
// Let's wait to see what the response is.
goto done;
}
// Discard old message.
Log_Info(
"New D2C message content (t:%d, content:0x%x).",
message_processing_context->type,
s_pendingMessageStore[message_processing_context->type].content);
OnMessageProcessingCompleted(&message_processing_context->message, ADUC_D2C_Message_Status_Replaced);
}
// Use new message
memset(&message_processing_context->message, 0, sizeof(message_processing_context->message));
message_processing_context->message = s_pendingMessageStore[message_processing_context->type];
message_processing_context->message.attempts = 0;
message_processing_context->retries = 0;
message_processing_context->nextRetryTimeStampEpoch = now;
// Empty pending message store.
memset(&s_pendingMessageStore[message_processing_context->type], 0, sizeof(ADUC_D2C_Message));
shouldSend = message_processing_context->message.content != NULL;
SetMessageStatus(&message_processing_context->message, ADUC_D2C_Message_Status_In_Progress);
}
else if (
(message_processing_context->message.content != NULL)
&& (message_processing_context->message.status == ADUC_D2C_Message_Status_In_Progress)
&& (now >= message_processing_context->nextRetryTimeStampEpoch))
{
shouldSend = true;
}
if (shouldSend)
{
if (message_processing_context->transportFunc == NULL)
{
Log_Error(
"Cannot send message. Transport function is NULL. Will retry in the next %d seconds. (t:%d)",
FATAL_ERROR_WAIT_TIME_SEC,
message_processing_context->type);
message_processing_context->nextRetryTimeStampEpoch += FATAL_ERROR_WAIT_TIME_SEC;
}
else
{
message_processing_context->message.attempts++;
Log_Debug(
"Sending D2C message (t:%d, retries:%d).",
message_processing_context->type,
message_processing_context->retries);
if (message_processing_context->transportFunc(
message_processing_context->message.cloudServiceHandle,
message_processing_context,
DefaultIoTHubSendReportedStateCompletedCallback)
!= 0)
{
message_processing_context->nextRetryTimeStampEpoch += FATAL_ERROR_WAIT_TIME_SEC;
Log_Error(
"Failed to send message. Will retry in the next %d seconds. (t:%d)",
FATAL_ERROR_WAIT_TIME_SEC,
message_processing_context->type);
}
}
}
done:
pthread_mutex_unlock(&message_processing_context->mutex);
pthread_mutex_unlock(&s_pendingMessageStoreMutex);
}