in src/message_sender.c [892:996]
ASYNC_OPERATION_HANDLE messagesender_send_async(MESSAGE_SENDER_HANDLE message_sender, MESSAGE_HANDLE message, ON_MESSAGE_SEND_COMPLETE on_message_send_complete, void* callback_context, tickcounter_ms_t timeout)
{
ASYNC_OPERATION_HANDLE result;
if ((message_sender == NULL) ||
(message == NULL))
{
LogError("Bad parameters: message_sender=%p, message=%p, on_message_send_complete=%p, callback_context=%p, timeout=%" PRIu64, message_sender, message, on_message_send_complete, callback_context, (uint64_t)timeout);
result = NULL;
}
else
{
if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_ERROR)
{
LogError("Message sender in ERROR state");
result = NULL;
}
else
{
result = CREATE_ASYNC_OPERATION(MESSAGE_WITH_CALLBACK, messagesender_send_cancel_handler);
if (result == NULL)
{
LogError("Failed allocating context for send");
}
else
{
ASYNC_OPERATION_HANDLE* new_messages;
MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, result);
size_t realloc_size = safe_add_size_t(message_sender->message_count, 1);
realloc_size = safe_multiply_size_t(realloc_size, sizeof(ASYNC_OPERATION_HANDLE));
if (realloc_size == SIZE_MAX ||
(new_messages = (ASYNC_OPERATION_HANDLE*)realloc(message_sender->messages, realloc_size)) == NULL)
{
LogError("Failed allocating memory for pending sends, size:%zu", realloc_size);
async_operation_destroy(result);
result = NULL;
}
else
{
message_with_callback->timeout = timeout;
message_sender->messages = new_messages;
if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_OPEN)
{
message_with_callback->message = message_clone(message);
if (message_with_callback->message == NULL)
{
LogError("Cannot clone message for placing it in the pending sends list");
async_operation_destroy(result);
result = NULL;
}
message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
}
else
{
message_with_callback->message = NULL;
message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
}
if (result != NULL)
{
message_with_callback->on_message_send_complete = on_message_send_complete;
message_with_callback->context = callback_context;
message_with_callback->message_sender = message_sender;
message_sender->messages[message_sender->message_count] = result;
message_sender->message_count++;
if (message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN)
{
switch (send_one_message(message_sender, result, message))
{
default:
case SEND_ONE_MESSAGE_ERROR:
LogError("Error sending message");
remove_pending_message_by_index(message_sender, message_sender->message_count - 1);
result = NULL;
break;
case SEND_ONE_MESSAGE_BUSY:
message_with_callback->message = message_clone(message);
if (message_with_callback->message == NULL)
{
LogError("Error cloning message for placing it in the pending sends list");
async_operation_destroy(result);
result = NULL;
}
else
{
message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
}
break;
case SEND_ONE_MESSAGE_OK:
break;
}
}
}
}
}
}
}
return result;
}