static AMQP_VALUE on_message_received()

in iothub_client/src/iothubtransportamqp_methods.c [249:441]


static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
{
    PROPERTIES_HANDLE properties;
    /* VS believes this is not initialized, so have to set it to the worse case here */
    AMQP_VALUE result = NULL;
    IOTHUBTRANSPORT_AMQP_METHODS_HANDLE amqp_methods_handle = (IOTHUBTRANSPORT_AMQP_METHODS_HANDLE)context;
    MESSAGE_OUTCOME message_outcome;

    if (message == NULL)
    {
        LogError("NULL message");
        message_outcome = MESSAGE_OUTCOME_RELEASED;
    }
    else
    {
        if (message_get_properties(message, &properties) != 0)
        {
            LogError("Cannot retrieve message properties");
            message_outcome = MESSAGE_OUTCOME_REJECTED;
            result = messaging_delivery_rejected("amqp:decode-error", "Cannot retrieve message properties");
        }
        else
        {
            AMQP_VALUE correlation_id;

            if (properties_get_correlation_id(properties, &correlation_id) != 0)
            {
                LogError("Cannot retrieve correlation id");
                message_outcome = MESSAGE_OUTCOME_REJECTED;
                result = messaging_delivery_rejected("amqp:decode-error", "Cannot retrieve correlation id");
            }
            else
            {
                IOTHUBTRANSPORT_AMQP_METHOD* method_handle = (IOTHUBTRANSPORT_AMQP_METHOD*)malloc(sizeof(IOTHUBTRANSPORT_AMQP_METHOD));
                if (method_handle == NULL)
                {
                    LogError("Cannot allocate method handle");
                    message_outcome = MESSAGE_OUTCOME_RELEASED;
                }
                else
                {
                    IOTHUBTRANSPORT_AMQP_METHOD_HANDLE* new_handles;
                    size_t realloc_size = safe_multiply_size_t(safe_add_size_t(amqp_methods_handle->method_request_handle_count, 1), sizeof(IOTHUBTRANSPORT_AMQP_METHOD_HANDLE));
                    if (realloc_size == SIZE_MAX || 
                        (new_handles = (IOTHUBTRANSPORT_AMQP_METHOD_HANDLE*)realloc(amqp_methods_handle->method_request_handles, realloc_size)) == NULL)
                    {
                        free(method_handle);
                        LogError("Cannot grow method handles array, size:%zu", realloc_size);
                        message_outcome = MESSAGE_OUTCOME_RELEASED;
                    }
                    else
                    {
                        amqp_methods_handle->method_request_handles = new_handles;

                        if (amqpvalue_get_uuid(correlation_id, &method_handle->correlation_id) != 0)
                        {
                            free(method_handle);
                            LogError("Cannot get uuid value for correlation-id");
                            message_outcome = MESSAGE_OUTCOME_REJECTED;
                            result = messaging_delivery_rejected("amqp:decode-error", "Cannot get uuid value for correlation-id");
                        }
                        else
                        {
                            BINARY_DATA binary_data;

                            if (message_get_body_amqp_data_in_place(message, 0, &binary_data) != 0)
                            {
                                free(method_handle);
                                LogError("Cannot get method request message payload");
                                message_outcome = MESSAGE_OUTCOME_REJECTED;
                                result = messaging_delivery_rejected("amqp:decode-error", "Cannot get method request message payload");
                            }
                            else
                            {
                                AMQP_VALUE application_properties;

                                if (message_get_application_properties(message, &application_properties) != 0)
                                {
                                    LogError("Cannot get application properties");
                                    free(method_handle);
                                    message_outcome = MESSAGE_OUTCOME_REJECTED;
                                    result = messaging_delivery_rejected("amqp:decode-error", "Cannot get application properties");
                                }
                                else
                                {
                                    AMQP_VALUE amqp_properties_map = amqpvalue_get_inplace_described_value(application_properties);
                                    if (amqp_properties_map == NULL)
                                    {
                                        LogError("Cannot get application properties map");
                                        free(method_handle);
                                        message_outcome = MESSAGE_OUTCOME_RELEASED;
                                    }
                                    else
                                    {
                                        AMQP_VALUE property_key = amqpvalue_create_string("IoThub-methodname");
                                        if (property_key == NULL)
                                        {
                                            LogError("Cannot create the property key for method name");
                                            free(method_handle);
                                            message_outcome = MESSAGE_OUTCOME_RELEASED;
                                        }
                                        else
                                        {
                                            AMQP_VALUE property_value = amqpvalue_get_map_value(amqp_properties_map, property_key);
                                            if (property_value == NULL)
                                            {
                                                LogError("Cannot find the IoThub-methodname property in the properties map");
                                                free(method_handle);
                                                message_outcome = MESSAGE_OUTCOME_REJECTED;
                                                result = messaging_delivery_rejected("amqp:decode-error", "Cannot find the IoThub-methodname property in the properties map");
                                            }
                                            else
                                            {
                                                const char* method_name;

                                                if (amqpvalue_get_string(property_value, &method_name) != 0)
                                                {
                                                    LogError("Cannot read the method name from the property value");
                                                    free(method_handle);
                                                    message_outcome = MESSAGE_OUTCOME_REJECTED;
                                                    result = messaging_delivery_rejected("amqp:decode-error", "Cannot read the method name from the property value");
                                                }
                                                else
                                                {
                                                    result = messaging_delivery_accepted();
                                                    if (result == NULL)
                                                    {
                                                        LogError("Cannot allocate memory for delivery state");
                                                        free(method_handle);
                                                        message_outcome = MESSAGE_OUTCOME_RELEASED;
                                                    }
                                                    else
                                                    {
                                                        method_handle->iothubtransport_amqp_methods_handle = amqp_methods_handle;

                                                        /* set the method request handle in the handle array */
                                                        amqp_methods_handle->method_request_handles[amqp_methods_handle->method_request_handle_count] = method_handle;
                                                        amqp_methods_handle->method_request_handle_count++;

                                                        if (amqp_methods_handle->on_method_request_received(amqp_methods_handle->on_method_request_received_context, method_name, binary_data.bytes, binary_data.length, method_handle) != 0)
                                                        {
                                                            LogError("Cannot execute the callback with the given data");
                                                            amqpvalue_destroy(result);
                                                            free(method_handle);
                                                            amqp_methods_handle->method_request_handle_count--;
                                                            message_outcome = MESSAGE_OUTCOME_REJECTED;
                                                            result = messaging_delivery_rejected("amqp:internal-error", "Cannot execute the callback with the given data");
                                                        }
                                                        else
                                                        {
                                                            message_outcome = MESSAGE_OUTCOME_ACCEPTED;
                                                        }
                                                    }
                                                }

                                                amqpvalue_destroy(property_value);
                                            }

                                            amqpvalue_destroy(property_key);
                                        }
                                    }

                                    application_properties_destroy(application_properties);
                                }
                            }
                        }
                    }
                }
            }

            properties_destroy(properties);
        }
    }

    switch (message_outcome)
    {
    default:
        break;

    case MESSAGE_OUTCOME_RELEASED:
        result = messaging_delivery_released();

        amqp_methods_handle->on_methods_error(amqp_methods_handle->on_methods_error_context);
        break;

    case MESSAGE_OUTCOME_REJECTED:
    case MESSAGE_OUTCOME_ACCEPTED:
        /* all is well */
        break;
    }

    return result;
}