in src/amqp_management.c [1161:1321]
ASYNC_OPERATION_HANDLE amqp_management_execute_operation_async(AMQP_MANAGEMENT_HANDLE amqp_management, const char* operation, const char* type, const char* locales, MESSAGE_HANDLE message, ON_AMQP_MANAGEMENT_EXECUTE_OPERATION_COMPLETE on_execute_operation_complete, void* on_execute_operation_complete_context)
{
ASYNC_OPERATION_HANDLE result;
if ((amqp_management == NULL) ||
(operation == NULL) ||
(type == NULL) ||
(on_execute_operation_complete == NULL))
{
/* Codes_SRS_AMQP_MANAGEMENT_01_057: [ If `amqp_management`, `operation`, `type` or `on_execute_operation_complete` is NULL, `amqp_management_execute_operation_async` shall fail and return NULL. ]*/
LogError("Bad arguments: amqp_management = %p, operation = %p, type = %p",
amqp_management, operation, type);
result = NULL;
}
/* Codes_SRS_AMQP_MANAGEMENT_01_081: [ If `amqp_management_execute_operation_async` is called when not OPEN, it shall fail and return NULL. ]*/
else if ((amqp_management->amqp_management_state == AMQP_MANAGEMENT_STATE_IDLE) ||
/* Codes_SRS_AMQP_MANAGEMENT_01_104: [ If `amqp_management_execute_operation_async` is called when the AMQP management is in error, it shall fail and return a non-zero value. ]*/
(amqp_management->amqp_management_state == AMQP_MANAGEMENT_STATE_ERROR))
{
LogError("amqp_management_execute_operation_async called while not open or in error");
result = NULL;
}
else
{
AMQP_VALUE application_properties;
MESSAGE_HANDLE cloned_message;
if (message == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_102: [ If `message` is NULL, a new message shall be created by calling `message_create`. ]*/
cloned_message = message_create();
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_103: [ Otherwise the existing message shall be cloned by using `message_clone` before being modified accordingly and used for the pending operation. ]*/
cloned_message = message_clone(message);
if (cloned_message == NULL)
{
LogError("Could not clone message");
}
}
if (cloned_message == NULL)
{
result = NULL;
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_055: [ `amqp_management_execute_operation_async` shall start an AMQP management operation. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_082: [ `amqp_management_execute_operation_async` shall obtain the application properties from the message by calling `message_get_application_properties`. ]*/
if (message_get_application_properties(cloned_message, &application_properties) != 0)
{
LogError("Could not get application properties");
result = NULL;
}
else
{
if (application_properties == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_083: [ If no application properties were set on the message, a new application properties instance shall be created by calling `amqpvalue_create_map`; ]*/
application_properties = amqpvalue_create_map();
if (application_properties == NULL)
{
LogError("Could not create application properties");
}
}
if (application_properties == NULL)
{
result = NULL;
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_084: [ For each of the arguments `operation`, `type` and `locales` an AMQP value of type string shall be created by calling `amqpvalue_create_string` in order to be used as key in the application properties map. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_085: [ For each of the arguments `operation`, `type` and `locales` an AMQP value of type string containing the argument value shall be created by calling `amqpvalue_create_string` in order to be used as value in the application properties map. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_058: [ Request messages have the following application-properties: ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_059: [ operation string Yes The management operation to be performed. ] */
if ((add_string_key_value_pair_to_map(application_properties, "operation", operation) != 0) ||
/* Codes_SRS_AMQP_MANAGEMENT_01_061: [ type string Yes The Manageable Entity Type of the Manageable Entity to be managed. ]*/
(add_string_key_value_pair_to_map(application_properties, "type", type) != 0) ||
/* Codes_SRS_AMQP_MANAGEMENT_01_093: [ If `locales` is NULL, no key/value pair shall be added for it in the application properties map. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_063: [ locales string No A list of locales that the sending peer permits for incoming informational text in response messages. ]*/
((locales != NULL) && (add_string_key_value_pair_to_map(application_properties, "locales", locales) != 0)))
{
result = NULL;
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_087: [ The application properties obtained after adding the key/value pairs shall be set on the message by calling `message_set_application_properties`. ]*/
if (message_set_application_properties(cloned_message, application_properties) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_090: [ If any APIs used to create and set the application properties on the message fails, `amqp_management_execute_operation_async` shall fail and return NULL. ]*/
LogError("Could not set application properties");
result = NULL;
}
else if (set_message_id(cloned_message, amqp_management->next_message_id) != 0)
{
result = NULL;
}
else
{
result = CREATE_ASYNC_OPERATION(OPERATION_MESSAGE_INSTANCE, amqp_management_execute_cancel_handler);
if (result == NULL)
{
LogError("Failed allocating async result");
}
else
{
OPERATION_MESSAGE_INSTANCE* pending_operation_message = GET_ASYNC_OPERATION_CONTEXT(OPERATION_MESSAGE_INSTANCE, result);
LIST_ITEM_HANDLE added_item;
pending_operation_message->callback_context = on_execute_operation_complete_context;
pending_operation_message->on_execute_operation_complete = on_execute_operation_complete;
pending_operation_message->message_id = amqp_management->next_message_id;
pending_operation_message->amqp_management = amqp_management;
pending_operation_message->message_send_confirmed = false;
pending_operation_message->execute_async_operation = result;
/* Codes_SRS_AMQP_MANAGEMENT_01_091: [ Once the request message has been sent, an entry shall be stored in the pending operations list by calling `singlylinkedlist_add`. ]*/
added_item = singlylinkedlist_add(amqp_management->pending_operations, pending_operation_message);
if (added_item == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_092: [ If `singlylinkedlist_add` fails then `amqp_management_execute_operation_async` shall fail and return a non-zero value. ]*/
LogError("Could not add the operation to the pending operations list.");
async_operation_destroy(result);
result = NULL;
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_088: [ `amqp_management_execute_operation_async` shall send the message by calling `messagesender_send_async`. ]*/
/* Codes_SRS_AMQP_MANAGEMENT_01_166: [ The `on_message_send_complete` callback shall be passed to the `messagesender_send_async` call. ]*/
pending_operation_message->send_async_context = messagesender_send_async(amqp_management->message_sender, cloned_message, on_message_send_complete, added_item, 0);
if (pending_operation_message->send_async_context == NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_089: [ If `messagesender_send_async` fails, `amqp_management_execute_operation_async` shall fail and return NULL. ]*/
LogError("Could not send request message");
(void)singlylinkedlist_remove(amqp_management->pending_operations, added_item);
async_operation_destroy(result);
result = NULL;
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_107: [ The message Id set on the message properties shall be incremented with each operation. ]*/
amqp_management->next_message_id++;
}
}
}
}
}
/* Codes_SRS_AMQP_MANAGEMENT_01_101: [ After setting the application properties, the application properties instance shall be freed by `amqpvalue_destroy`. ]*/
amqpvalue_destroy(application_properties);
}
}
message_destroy(cloned_message);
}
}
return result;
}