static AMQP_VALUE IoTHubMessaging_LL_FeedbackMessageReceived()

in iothub_service_client/src/iothub_messaging_ll.c [757:922]


static AMQP_VALUE IoTHubMessaging_LL_FeedbackMessageReceived(const void* context, MESSAGE_HANDLE message)
{
    AMQP_VALUE result;

    if (context == NULL)
    {
        result = messaging_delivery_accepted();
    }
    else
    {
        IOTHUB_MESSAGING* messagingData = (IOTHUB_MESSAGING*)context;

        BINARY_DATA binary_data;
        binary_data.bytes = NULL;
        binary_data.length = 0;

        JSON_Value* root_value = NULL;
        JSON_Object* feedback_object = NULL;
        JSON_Array* feedback_array = NULL;

        if (message_get_body_amqp_data_in_place(message, 0, &binary_data) != 0)
        {
            LogError("Cannot get message data");
            result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed reading message body");
        }
        else if ((root_value = json_parse_string((const char*)binary_data.bytes)) == NULL)
        {
            LogError("json_parse_string failed");
            result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed parsing json root");
        }
        else if ((feedback_array = json_value_get_array(root_value)) == NULL)
        {
            LogError("json_parse_string failed");
            result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed parsing json array");
        }
        else if (json_array_get_count(feedback_array) == 0)
        {
            LogError("json_array_get_count failed");
            result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "json_array_get_count failed");
        }
        else
        {
            IOTHUB_SERVICE_FEEDBACK_BATCH* feedbackBatch;

            if ((feedbackBatch = (IOTHUB_SERVICE_FEEDBACK_BATCH*)malloc(sizeof(IOTHUB_SERVICE_FEEDBACK_BATCH))) == NULL)
            {
                LogError("Failed allocating IOTHUB_SERVICE_FEEDBACK_BATCH");
                result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed to allocate memory for feedback batch");
            }
            else
            {
                size_t array_count = 0;
                if ((array_count = json_array_get_count(feedback_array)) <= 0)
                {
                    LogError("json_array_get_count failed");
                    free(feedbackBatch);
                    result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "json_array_get_count failed");
                }
                else if ((feedbackBatch->feedbackRecordList = singlylinkedlist_create()) == NULL)
                {
                    LogError("singlylinkedlist_create failed");
                    free(feedbackBatch);
                    result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "singlylinkedlist_create failed");
                }
                else
                {
                    bool isLoopFailed = false;
                    for (size_t i = 0; i < array_count; i++)
                    {
                        if ((feedback_object = json_array_get_object(feedback_array, i)) == NULL)
                        {
                            isLoopFailed = true;
                            break;
                        }
                        else
                        {
                            IOTHUB_SERVICE_FEEDBACK_RECORD* feedbackRecord;
                            if ((feedbackRecord = (IOTHUB_SERVICE_FEEDBACK_RECORD*)malloc(sizeof(IOTHUB_SERVICE_FEEDBACK_RECORD))) == NULL)
                            {
                                isLoopFailed = true;
                                break;
                            }
                            else
                            {
                                feedbackRecord->deviceId = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_DEVICE_ID);
                                feedbackRecord->generationId = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_DEVICE_GENERATION_ID);
                                feedbackRecord->description = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_DESCRIPTION);
                                feedbackRecord->enqueuedTimeUtc = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_ENQUED_TIME_UTC);
                                feedbackRecord->originalMessageId = (char*)json_object_get_string(feedback_object, FEEDBACK_RECORD_KEY_ORIGINAL_MESSAGE_ID);
                                feedbackRecord->correlationId = "";

                                if (feedbackRecord->description == NULL)
                                {
                                    feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_UNKNOWN;
                                }
                                else
                                {
                                    size_t j;
                                    for (j = 0; feedbackRecord->description[j]; j++)
                                    {
                                        feedbackRecord->description[j] = (char)tolower(feedbackRecord->description[j]);
                                    }

                                    if (strcmp(feedbackRecord->description, "success") == 0)
                                    {
                                        feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_SUCCESS;
                                    }
                                    else if (strcmp(feedbackRecord->description, "expired") == 0)
                                    {
                                        feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_EXPIRED;
                                    }
                                    else if (strcmp(feedbackRecord->description, "deliverycountexceeded") == 0)
                                    {
                                        feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_DELIVER_COUNT_EXCEEDED;
                                    }
                                    else if (strcmp(feedbackRecord->description, "rejected") == 0)
                                    {
                                        feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_REJECTED;
                                    }
                                    else
                                    {
                                        feedbackRecord->statusCode = IOTHUB_FEEDBACK_STATUS_CODE_UNKNOWN;
                                    }
                                }
                                if (singlylinkedlist_add(feedbackBatch->feedbackRecordList, feedbackRecord) == NULL)
                                {
                                    LogError("singlylinkedlist_add failed");
                                    free(feedbackRecord);
                                }
                            }
                        }
                    }
                    feedbackBatch->lockToken = "";
                    feedbackBatch->userId = "";

                    if (isLoopFailed)
                    {
                        LogError("Failed to read feedback records");
                        result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed to read feedback records");
                    }
                    else
                    {
                        if (messagingData->callback_data.feedbackMessageCallback != NULL)
                        {
                            (messagingData->callback_data.feedbackMessageCallback)(messagingData->callback_data.feedbackUserContext, feedbackBatch);
                        }
                        result = messaging_delivery_accepted();
                    }

                    LIST_ITEM_HANDLE feedbackRecord = singlylinkedlist_get_head_item(feedbackBatch->feedbackRecordList);
                    while (feedbackRecord != NULL)
                    {
                        IOTHUB_SERVICE_FEEDBACK_RECORD* feedback = (IOTHUB_SERVICE_FEEDBACK_RECORD*)singlylinkedlist_item_get_value(feedbackRecord);
                        feedbackRecord = singlylinkedlist_get_next_item(feedbackRecord);
                        free(feedback);
                    }
                    singlylinkedlist_destroy(feedbackBatch->feedbackRecordList);
                    free(feedbackBatch);
                }
            }
        }
        json_array_clear(feedback_array);
        json_value_free(root_value);
    }
    return result;
}