in iothub_service_client/src/iothub_messaging_ll.c [757:922]
static AMQP_VALUE IoTHubMessaging_LL_FeedbackMessageReceived(const void* context, MESSAGE_HANDLE message)
{
AMQP_VALUE result;
if (context == NULL)
{
result = messaging_delivery_accepted();
}
else
{
IOTHUB_MESSAGING* messagingData = (IOTHUB_MESSAGING*)context;
BINARY_DATA binary_data;
binary_data.bytes = NULL;
binary_data.length = 0;
JSON_Value* root_value = NULL;
JSON_Object* feedback_object = NULL;
JSON_Array* feedback_array = NULL;
if (message_get_body_amqp_data_in_place(message, 0, &binary_data) != 0)
{
LogError("Cannot get message data");
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading message body");
}
else if ((root_value = json_parse_string((const char*)binary_data.bytes)) == NULL)
{
LogError("json_parse_string failed");
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed parsing json root");
}
else if ((feedback_array = json_value_get_array(root_value)) == NULL)
{
LogError("json_parse_string failed");
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed parsing json array");
}
else if (json_array_get_count(feedback_array) == 0)
{
LogError("json_array_get_count failed");
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "json_array_get_count failed");
}
else
{
IOTHUB_SERVICE_FEEDBACK_BATCH* feedbackBatch;
if ((feedbackBatch = (IOTHUB_SERVICE_FEEDBACK_BATCH*)malloc(sizeof(IOTHUB_SERVICE_FEEDBACK_BATCH))) == NULL)
{
LogError("Failed allocating IOTHUB_SERVICE_FEEDBACK_BATCH");
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed to allocate memory for feedback batch");
}
else
{
size_t array_count = 0;
if ((array_count = json_array_get_count(feedback_array)) <= 0)
{
LogError("json_array_get_count failed");
free(feedbackBatch);
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "json_array_get_count failed");
}
else if ((feedbackBatch->feedbackRecordList = singlylinkedlist_create()) == NULL)
{
LogError("singlylinkedlist_create failed");
free(feedbackBatch);
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "singlylinkedlist_create failed");
}
else
{
bool isLoopFailed = false;
for (size_t i = 0; i < array_count; i++)
{
if ((feedback_object = json_array_get_object(feedback_array, i)) == NULL)
{
isLoopFailed = true;
break;
}
else
{
IOTHUB_SERVICE_FEEDBACK_RECORD* feedbackRecord;
if ((feedbackRecord = (IOTHUB_SERVICE_FEEDBACK_RECORD*)malloc(sizeof(IOTHUB_SERVICE_FEEDBACK_RECORD))) == NULL)
{
isLoopFailed = true;
break;
}
else
{
feedbackRecord->deviceId = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_DEVICE_ID);
feedbackRecord->generationId = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_DEVICE_GENERATION_ID);
feedbackRecord->description = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_DESCRIPTION);
feedbackRecord->enqueuedTimeUtc = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_ENQUED_TIME_UTC);
feedbackRecord->originalMessageId = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_ORIGINAL_MESSAGE_ID);
feedbackRecord->correlationId = "";
if (feedbackRecord->description == NULL)
{
feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_UNKNOWN;
}
else
{
size_t j;
for (j = 0; feedbackRecord->description[j]; j++)
{
feedbackRecord->description[j] = (char)tolower(feedbackRecord->description[j]);
}
if (strcmp(feedbackRecord->description, "success") == 0)
{
feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_SUCCESS;
}
else if (strcmp(feedbackRecord->description, "expired") == 0)
{
feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_EXPIRED;
}
else if (strcmp(feedbackRecord->description, "deliverycountexceeded") == 0)
{
feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_DELIVER_COUNT_EXCEEDED;
}
else if (strcmp(feedbackRecord->description, "rejected") == 0)
{
feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_REJECTED;
}
else
{
feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_UNKNOWN;
}
}
if (singlylinkedlist_add(feedbackBatch->feedbackRecordList, feedbackRecord) == NULL)
{
LogError("singlylinkedlist_add failed");
free(feedbackRecord);
}
}
}
}
feedbackBatch->lockToken = "";
feedbackBatch->userId = "";
if (isLoopFailed)
{
LogError("Failed to read feedback records");
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed to read feedback records");
}
else
{
if (messagingData->callback_data.feedbackMessageCallback != NULL)
{
(messagingData->callback_data.feedbackMessageCallback)(messagingData->callback_data.feedbackUserContext, feedbackBatch);
}
result = messaging_delivery_accepted();
}
LIST_ITEM_HANDLE feedbackRecord = singlylinkedlist_get_head_item(feedbackBatch->feedbackRecordList);
while (feedbackRecord != NULL)
{
IOTHUB_SERVICE_FEEDBACK_RECORD* feedback = (IOTHUB_SERVICE_FEEDBACK_RECORD*)singlylinkedlist_item_get_value(feedbackRecord);
feedbackRecord = singlylinkedlist_get_next_item(feedbackRecord);
free(feedback);
}
singlylinkedlist_destroy(feedbackBatch->feedbackRecordList);
free(feedbackBatch);
}
}
}
json_array_clear(feedback_array);
json_value_free(root_value);
}
return result;
}