in src/amqp_management.c [66:352]
static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
{
AMQP_VALUE result;
if (context == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_108: [ When `on_message_received` is called with a NULL context, it shall do nothing. ]*/
LogError("NULL context in on_message_received");
result = NULL;
}
else
{
AMQP_MANAGEMENT_HANDLE amqp_management = (AMQP_MANAGEMENT_HANDLE)context;
AMQP_VALUE application_properties;
/* Codes_SRS_AMQP_MANAGEMENT_01_109: [ `on_message_received` shall obtain the application properties from the message by calling `message_get_application_properties`. ]*/
if (message_get_application_properties(message, &application_properties) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_113: [ If obtaining the application properties or message properties fails, an error shall be indicated by calling `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not retrieve application properties");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get application properties on AMQP management response.");
}
else
{
PROPERTIES_HANDLE response_properties;
/* Codes_SRS_AMQP_MANAGEMENT_01_110: [ `on_message_received` shall obtain the message properties from the message by calling `message_get_properties`. ]*/
if (message_get_properties(message, &response_properties) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_113: [ If obtaining the application properties or message properties fails, an error shall be indicated by calling `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not retrieve message properties");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get message properties on AMQP management response.");
}
else
{
AMQP_VALUE key;
AMQP_VALUE value;
AMQP_VALUE desc_key;
AMQP_VALUE desc_value;
AMQP_VALUE map;
AMQP_VALUE correlation_id_value;
uint64_t correlation_id;
/* Codes_SRS_AMQP_MANAGEMENT_01_111: [ `on_message_received` shall obtain the correlation Id from the message properties by using `properties_get_correlation_id`. ]*/
if (properties_get_correlation_id(response_properties, &correlation_id_value) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_114: [ If obtaining the correlation Id fails, an error shall be indicated by calling `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ] */
LogError("Could not retrieve correlation Id");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get correlation Id from AMQP management response.");
}
else
{
if (amqpvalue_get_ulong(correlation_id_value, &correlation_id) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_132: [ If any functions manipulating AMQP values, application properties, etc., fail, an error shall be indicated to the consumer by calling the `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not retrieve correlation Id ulong value");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get correlation Id from AMQP management response.");
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_119: [ `on_message_received` shall obtain the application properties map by calling `amqpvalue_get_inplace_described_value`. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_070: [ Response messages have the following application-properties: ]*/
map = amqpvalue_get_inplace_described_value(application_properties);
if (map == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_132: [ If any functions manipulating AMQP values, application properties, etc., fail, an error shall be indicated to the consumer by calling the `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not retrieve application property map");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get application property map from the application properties in the AMQP management response.");
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_120: [ An AMQP value used to lookup the status code shall be created by calling `amqpvalue_create_string` with the status code key name (`statusCode`) as argument. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_071: [ statusCode integer Yes HTTP response code [RFC2616] ]*/
key = amqpvalue_create_string(amqp_management->status_code_key_name);
if (key == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_132: [ If any functions manipulating AMQP values, application properties, etc., fail, an error shall be indicated to the consumer by calling the `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not create status-code amqp value");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_135: [ When an error occurs in creating AMQP values (for status code, etc.) `on_message_received` shall call `messaging_delivery_released` and return the created delivery AMQP value. ]*/
result = messaging_delivery_released();
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_121: [ The status code shall be looked up in the application properties by using `amqpvalue_get_map_value`. ]*/
value = amqpvalue_get_map_value(map, key);
if (value == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_122: [ If status code is not found an error shall be indicated to the consumer by calling the `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not retrieve status code from application properties");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not retrieve status code from the application properties in the AMQP management response.");
}
else
{
int32_t status_code;
/* Codes_SRS_AMQP_MANAGEMENT_01_133: [ The status code value shall be extracted from the value found in the map by using `amqpvalue_get_int`. ]*/
if (amqpvalue_get_int(value, &status_code) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_132: [ If any functions manipulating AMQP values, application properties, etc., fail, an error shall be indicated to the consumer by calling the `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not retrieve status code int value");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not retrieve status code value from the application properties in the AMQP management response.");
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_123: [ An AMQP value used to lookup the status description shall be created by calling `amqpvalue_create_string` with the status description key name (`statusDescription`) as argument. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_072: [ statusDescription string No Description of the status. ]*/
desc_key = amqpvalue_create_string(amqp_management->status_description_key_name);
if (desc_key == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_132: [ If any functions manipulating AMQP values, application properties, etc., fail, an error shall be indicated to the consumer by calling the `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not create status-description amqp value");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_135: [ When an error occurs in creating AMQP values (for status code, etc.) `on_message_received` shall call `messaging_delivery_released` and return the created delivery AMQP value. ]*/
result = messaging_delivery_released();
}
else
{
const char* status_description = NULL;
LIST_ITEM_HANDLE list_item_handle;
bool found = false;
bool is_error = false;
/* Codes_SRS_AMQP_MANAGEMENT_01_124: [ The status description shall be looked up in the application properties by using `amqpvalue_get_map_value`. ]*/
desc_value = amqpvalue_get_map_value(map, desc_key);
if (desc_value != NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/
if (amqpvalue_get_string(desc_value, &status_description) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/
status_description = NULL;
}
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/
status_description = NULL;
}
list_item_handle = singlylinkedlist_get_head_item(amqp_management->pending_operations);
while (list_item_handle != NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_116: [ Each pending operation item value shall be obtained by calling `singlylinkedlist_item_get_value`. ]*/
OPERATION_MESSAGE_INSTANCE* operation_message = (OPERATION_MESSAGE_INSTANCE*)singlylinkedlist_item_get_value(list_item_handle);
if (operation_message == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_117: [ If iterating through the pending operations list fails, an error shall be indicated by calling `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not create status-description amqp value");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_135: [ When an error occurs in creating AMQP values (for status code, etc.) `on_message_received` shall call `messaging_delivery_released` and return the created delivery AMQP value. ]*/
messaging_delivery_released();
break;
}
else
{
AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT execute_operation_result;
/* Codes_SRS_AMQP_MANAGEMENT_01_112: [ `on_message_received` shall check if the correlation Id matches the stored message Id of any pending operation. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_068: [ The correlation-id of the response message MUST be the correlation-id from the request message (if present) ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_069: [ else the message-id from the request message. ]*/
if (correlation_id == operation_message->message_id)
{
if (!operation_message->message_send_confirmed)
{
LogError("Did not receive send confirmation for pending operation");
execute_operation_result = AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS;
if (async_operation_cancel(operation_message->send_async_context) != 0)
{
LogError("Failed cancelling pending send operation");
is_error = true;
}
}
/* Codes_SRS_AMQP_MANAGEMENT_01_074: [ Successful operations MUST result in a statusCode in the 2xx range as defined in Section 10.2 of [RFC2616]. ]*/
else if ((status_code < 200) || (status_code > 299))
{
/* Codes_SRS_AMQP_MANAGEMENT_01_128: [ If the status indicates that the operation failed, the result callback argument shall be `AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS`. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_075: [ Unsuccessful operations MUST NOT result in a statusCode in the 2xx range as defined in Section 10.2 of [RFC2616]. ]*/
execute_operation_result = AMQP_MANAGEMENT_EXECUTE_OPERATION_FAILED_BAD_STATUS;
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_127: [ If the operation succeeded the result callback argument shall be `AMQP_MANAGEMENT_EXECUTE_OPERATION_OK`. ]*/
execute_operation_result = AMQP_MANAGEMENT_EXECUTE_OPERATION_OK;
}
/* Codes_SRS_AMQP_MANAGEMENT_01_126: [ If a corresponding correlation Id is found in the pending operations list, the callback associated with the pending operation shall be called. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_166: [ The `message` shall be passed as argument to the callback. ]*/
if (operation_message->on_execute_operation_complete != NULL)
{
// Check for NULL in case operation has been cancelled.
operation_message->on_execute_operation_complete(operation_message->callback_context, execute_operation_result, status_code, status_description, message);
}
async_operation_destroy(operation_message->execute_async_operation);
/* Codes_SRS_AMQP_MANAGEMENT_01_129: [ After calling the callback, the pending operation shall be removed from the pending operations list by calling `singlylinkedlist_remove`. ]*/
if (singlylinkedlist_remove(amqp_management->pending_operations, list_item_handle) != 0)
{
LogError("Cannot remove pending operation");
is_error = true;
}
else
{
found = true;
}
break;
}
}
/* Codes_SRS_AMQP_MANAGEMENT_01_115: [ Iterating through the pending operations shall be done by using `singlylinkedlist_get_head_item` and `singlylinkedlist_get_next_item` until the enm of the pending operations singly linked list is reached. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_117: [ If iterating through the pending operations list fails, an error shall be indicated by calling `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
list_item_handle = singlylinkedlist_get_next_item(list_item_handle);
}
if (is_error)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_117: [ If iterating through the pending operations list fails, an error shall be indicated by calling `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_135: [ When an error occurs in creating AMQP values (for status code, etc.) `on_message_received` shall call `messaging_delivery_released` and return the created delivery AMQP value. ]*/
result = messaging_delivery_released();
}
else
{
if (!found)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_118: [ If no pending operation is found matching the correlation Id, an error shall be indicated by calling `on_amqp_management_error` and passing the `on_amqp_management_error_context` to it. ]*/
LogError("Could not match AMQP management response to request");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_135: [ When an error occurs in creating AMQP values (for status code, etc.) `on_message_received` shall call `messaging_delivery_released` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not match AMQP management response to request");
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_130: [ The `on_message_received` shall call `messaging_delivery_accepted` and return the created delivery AMQP value. ]*/
result = messaging_delivery_accepted();
}
}
if (desc_value != NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_131: [ All temporary values like AMQP values used as keys shall be freed before exiting the callback. ]*/
amqpvalue_destroy(desc_value);
}
/* Codes_SRS_AMQP_MANAGEMENT_01_131: [ All temporary values like AMQP values used as keys shall be freed before exiting the callback. ]*/
amqpvalue_destroy(desc_key);
}
}
/* Codes_SRS_AMQP_MANAGEMENT_01_131: [ All temporary values like AMQP values used as keys shall be freed before exiting the callback. ]*/
amqpvalue_destroy(value);
}
/* Codes_SRS_AMQP_MANAGEMENT_01_131: [ All temporary values like AMQP values used as keys shall be freed before exiting the callback. ]*/
amqpvalue_destroy(key);
}
}
}
}
/* Codes_SRS_AMQP_MANAGEMENT_01_131: [ All temporary values like AMQP values used as keys shall be freed before exiting the callback. ]*/
properties_destroy(response_properties);
}
/* Codes_SRS_AMQP_MANAGEMENT_01_131: [ All temporary values like AMQP values used as keys shall be freed before exiting the callback. ]*/
application_properties_destroy(application_properties);
}
}
return result;
}