in linux/src/async_socket_linux.c [569:733]
ASYNC_SOCKET_SEND_SYNC_RESULT async_socket_send_async(ASYNC_SOCKET_HANDLE async_socket, const ASYNC_SOCKET_BUFFER* buffers, uint32_t buffer_count, ON_ASYNC_SOCKET_SEND_COMPLETE on_send_complete, void* on_send_complete_context)
{
ASYNC_SOCKET_SEND_SYNC_RESULT result;
// Codes_SRS_ASYNC_SOCKET_LINUX_11_050: [ on_send_complete_context shall be allowed to be NULL. ]
if (
// Codes_SRS_ASYNC_SOCKET_LINUX_11_043: [ If async_socket is NULL, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
async_socket == NULL ||
// Codes_SRS_ASYNC_SOCKET_LINUX_11_044: [ If buffers is NULL, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
buffers == NULL ||
// Codes_SRS_ASYNC_SOCKET_LINUX_11_045: [ If buffer_count is 0, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
buffer_count == 0 ||
// Codes_SRS_ASYNC_SOCKET_LINUX_11_049: [ If on_send_complete is NULL, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
on_send_complete == NULL
)
{
LogError("Invalid arguments: ASYNC_SOCKET_HANDLE async_socket=%p, const ASYNC_SOCKET_BUFFER* payload=%p, uint32_t buffer_count=%" PRIu32 ", ON_ASYNC_SOCKET_SEND_COMPLETE on_send_complete=%p, void*, on_send_complete_context=%p",
async_socket, buffers, buffer_count, on_send_complete, on_send_complete_context);
result = ASYNC_SOCKET_SEND_SYNC_ERROR;
}
else
{
uint32_t index;
uint32_t total_buffer_bytes = 0;
for (index = 0; index < buffer_count; index++)
{
if (
// Codes_SRS_ASYNC_SOCKET_LINUX_11_046: [ If any of the buffers in payload has buffer set to NULL, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
(buffers[index].buffer == NULL) ||
// Codes_SRS_ASYNC_SOCKET_LINUX_11_047: [ If any of the buffers in payload has length set to 0, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
(buffers[index].length == 0)
)
{
LogError("Invalid buffer %" PRIu32 ": buffer=%p, length = %" PRIu32, index, buffers[index].buffer, buffers[index].length);
break;
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_048: [ If the sum of buffer lengths for all the buffers in payload is greater than UINT32_MAX, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
if (total_buffer_bytes + buffers[index].length < total_buffer_bytes)
{
LogError("Overflow in total buffer length computation (total_buffer_bytes=%" PRIu32 " + buffers[i=%" PRIu32 "].length=%" PRIu32 "", total_buffer_bytes, index, buffers[index].length);
break;
}
else
{
total_buffer_bytes += buffers[index].length;
}
}
if (index < buffer_count)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_063: [ If any error occurs, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
LogError("Invalid buffers passed to async_socket_send_async");
result = ASYNC_SOCKET_SEND_SYNC_ERROR;
}
else
{
(void)interlocked_increment(&async_socket->pending_api_calls);
ASYNC_SOCKET_LINUX_STATE current_state;
// Codes_SRS_ASYNC_SOCKET_LINUX_11_051: [ If async_socket is not OPEN, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_NOT_OPEN. ]
if ((current_state = interlocked_add(&async_socket->state, 0)) != ASYNC_SOCKET_LINUX_STATE_OPEN)
{
LogWarning("Not open, current state is %" PRI_MU_ENUM "", MU_ENUM_VALUE(ASYNC_SOCKET_LINUX_STATE, current_state));
result = ASYNC_SOCKET_SEND_SYNC_NOT_OPEN;
}
else
{
#ifdef ENABLE_SOCKET_LOGGING
LogVerbose("Starting send of %" PRIu32 " bytes at %lf", total_buffer_bytes, timer_global_get_elapsed_us());
#endif
ASYNC_SOCKET_SEND_RESULT send_result;
for (index = 0; index < buffer_count; index++)
{
int error_no;
ssize_t total_data_sent;
// Codes_SRS_ASYNC_SOCKET_LINUX_11_054: [ If the send fails to send the data, async_socket_send_async shall do the following: ]
if (send_data(async_socket, &buffers[index], &total_data_sent, &error_no) != 0)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_055: [ If the errno value is EAGAIN or EWOULDBLOCK. ]
if (error_no == EAGAIN || error_no == EWOULDBLOCK)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_056: [ async_socket_send_async shall create a context for the send where the payload, on_send_complete and on_send_complete_context shall be stored. ]
ASYNC_SOCKET_IO_CONTEXT* io_context = malloc(sizeof(ASYNC_SOCKET_IO_CONTEXT));
if (io_context == NULL)
{
LogError("malloc(sizeof(ASYNC_SOCKET_SEND_CONTEXT)=%zu failed", sizeof(ASYNC_SOCKET_IO_CONTEXT));
result = ASYNC_SOCKET_SEND_SYNC_ERROR;
send_result = ASYNC_SOCKET_SEND_ABANDONED;
break;
}
else
{
send_result = ASYNC_SOCKET_SEND_ERROR;
io_context->io_type = ASYNC_SOCKET_IO_TYPE_SEND;
io_context->data.send_ctx.total_buffer_bytes = total_buffer_bytes - total_data_sent;
io_context->on_send_complete = on_send_complete;
io_context->callback_context = on_send_complete_context;
io_context->async_socket = async_socket;
io_context->data.send_ctx.socket_buffer.buffer = buffers[index].buffer + total_data_sent;
io_context->data.send_ctx.socket_buffer.length = buffers[index].length - total_data_sent;
// Codes_SRS_ASYNC_SOCKET_LINUX_11_057: [ The context shall then be added to the completion port system by calling completion_port_add with EPOLL_CTL_MOD and `event_complete_callback` as the callback. ]
if (completion_port_add(async_socket->completion_port, EPOLLOUT, async_socket->socket_handle, event_complete_callback, io_context) != 0)
{
LogError("failure with completion_port_add");
result = ASYNC_SOCKET_SEND_SYNC_ERROR;
}
else
{
(void)interlocked_increment(&async_socket->added_to_completion_port);
result = ASYNC_SOCKET_SEND_SYNC_OK;
goto all_ok;
}
}
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_059: [ If the errno value is ECONNRESET, ENOTCONN, or EPIPE shall fail and return ASYNC_SOCKET_SEND_SYNC_NOT_OPEN. ]
else if (error_no == ECONNRESET || error_no == ENOTCONN || error_no == EPIPE)
{
LOGGER_LOG_EX(LOG_LEVEL_WARNING, LOG_ERRNO(), LOG_MESSAGE("The connection was forcibly closed by the peer"));
send_result = ASYNC_SOCKET_SEND_ABANDONED;
// Socket was closed
result = ASYNC_SOCKET_SEND_SYNC_NOT_OPEN;
}
else
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_060: [ If any other error is encountered, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
LogErrorNo("failure sending socket error no");
result = ASYNC_SOCKET_SEND_SYNC_ERROR;
send_result = ASYNC_SOCKET_SEND_ERROR;
}
}
else
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_062: [ On success, async_socket_send_async shall return ASYNC_SOCKET_SEND_SYNC_OK. ]
send_result = ASYNC_SOCKET_SEND_OK;
result = ASYNC_SOCKET_SEND_SYNC_OK;
}
}
// Only call the callback if the call was successfully sent
// Otherwise we're going to be returning an error
if (send_result == ASYNC_SOCKET_SEND_OK)
{
#ifdef ENABLE_SOCKET_LOGGING
LogVerbose("Send completed synchronously at %lf", timer_global_get_elapsed_us());
#endif
// Codes_SRS_ASYNC_SOCKET_LINUX_11_061: [ If the send is successful, async_socket_send_async shall call the on_send_complete with on_send_complete_context and ASYNC_SOCKET_SEND_SYNC_OK. ]
on_send_complete(on_send_complete_context, send_result);
}
else
{
// Do nothing. If failure happend do not call the callback
}
}
all_ok:
if (interlocked_decrement(&async_socket->pending_api_calls) == 0)
{
wake_by_address_single(&async_socket->pending_api_calls);
}
}
}
return result;
}