in linux/src/async_socket_linux.c [735:853]
int async_socket_receive_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_BUFFER* payload, uint32_t buffer_count, ON_ASYNC_SOCKET_RECEIVE_COMPLETE on_receive_complete, void* on_receive_complete_context)
{
int result;
// Codes_SRS_ASYNC_SOCKET_LINUX_11_071: [ on_receive_complete_context shall be allowed to be NULL. ]
if (
// Codes_SRS_ASYNC_SOCKET_LINUX_11_064: [ If async_socket is NULL, async_socket_receive_async shall fail and return a non-zero value. ]
async_socket == NULL ||
// Codes_SRS_ASYNC_SOCKET_LINUX_11_065: [ If buffers is NULL, async_socket_receive_async shall fail and return a non-zero value. ]
payload == NULL ||
// Codes_SRS_ASYNC_SOCKET_LINUX_11_066: [ If buffer_count is 0, async_socket_receive_async shall fail and return a non-zero value. ]
buffer_count == 0 ||
// Codes_SRS_ASYNC_SOCKET_LINUX_11_070: [ If on_receive_complete is NULL, async_socket_receive_async shall fail and return a non-zero value. ]
on_receive_complete == NULL
)
{
LogError("Invalid arguments: ASYNC_SOCKET_HANDLE async_socket=%p, const ASYNC_SOCKET_BUFFER* payload=%p, uint32_t buffer_count=%" PRIu32 ", ON_ASYNC_SOCKET_RECEIVE_COMPLETE on_receive_complete=%p, void*, on_receive_complete_context=%p",
async_socket, payload, buffer_count, on_receive_complete, on_receive_complete_context);
result = MU_FAILURE;
}
else
{
uint32_t total_buffer_bytes = 0;
uint32_t index;
for (index = 0; index < buffer_count; index++)
{
if (
// Codes_SRS_ASYNC_SOCKET_LINUX_11_067: [ If any of the buffers in payload has buffer set to NULL, async_socket_receive_async shall fail and return a non-zero value. ]
payload[index].buffer == NULL ||
// Codes_SRS_ASYNC_SOCKET_LINUX_11_068: [ If any of the buffers in payload has length set to 0, async_socket_receive_async shall fail and return a non-zero value. ]
payload[index].length == 0
)
{
LogError("Invalid buffer %" PRIu32 ": buffer=%p, length = %" PRIu32, index, payload[index].buffer, payload[index].length);
break;
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_069: [ If the sum of buffer lengths for all the buffers in payload is greater than UINT32_MAX, async_socket_receive_async shall fail and return a non-zero value. ]
if (total_buffer_bytes + payload[index].length < total_buffer_bytes)
{
LogError("Overflow in total buffer length computation total_buffer_bytes=%" PRIu32 " + payload[i=%" PRIu32 "].length=%" PRIu32 "", total_buffer_bytes, index, payload[index].length);
break;
}
else
{
total_buffer_bytes += payload[index].length;
}
}
if (index < buffer_count)
{
LogError("Invalid buffers passed to async_socket_receive_async");
result = MU_FAILURE;
}
else
{
(void)interlocked_increment(&async_socket->pending_api_calls);
ASYNC_SOCKET_LINUX_STATE current_state;
// Codes_SRS_ASYNC_SOCKET_LINUX_11_072: [ If async_socket is not OPEN, async_socket_receive_async shall fail and return a non-zero value. ]
if ((current_state = interlocked_add(&async_socket->state, 0)) != ASYNC_SOCKET_LINUX_STATE_OPEN)
{
/* Codes_SRS_ASYNC_SOCKET_WIN32_01_098: [ If async_socket is not OPEN, async_socket_receive_async shall fail and return a non-zero value. ]*/
LogWarning("Not open, current state is %" PRI_MU_ENUM "", MU_ENUM_VALUE(ASYNC_SOCKET_LINUX_STATE, current_state));
result = MU_FAILURE;
}
else
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_074: [ The context shall also allocate enough memory to keep an array of buffer_count items. ]
ASYNC_SOCKET_IO_CONTEXT* io_context = malloc_flex(sizeof(ASYNC_SOCKET_IO_CONTEXT), buffer_count, sizeof(ASYNC_SOCKET_BUFFER));
if (io_context == NULL)
{
LogError("failure in malloc_flex(sizeof(ASYNC_SOCKET_RECV_CONTEXT)=%zu, buffer_count=%" PRIu32 ", sizeof(ASYNC_SOCKET_BUFFER)=%zu) failed",
sizeof(ASYNC_SOCKET_IO_CONTEXT), buffer_count, sizeof(ASYNC_SOCKET_BUFFER));
result = ASYNC_SOCKET_SEND_SYNC_ERROR;
}
else
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_073: [ Otherwise async_socket_receive_async shall create a context for the recv where the payload, on_receive_complete and on_receive_complete_context shall be stored. ]
io_context->io_type = ASYNC_SOCKET_IO_TYPE_RECEIVE;
io_context->on_receive_complete = on_receive_complete;
io_context->callback_context = on_receive_complete_context;
io_context->async_socket = async_socket;
io_context->data.recv_ctx.total_buffer_count = buffer_count;
for (index = 0; index < buffer_count; index++)
{
io_context->data.recv_ctx.recv_buffers[index].buffer = payload[index].buffer;
io_context->data.recv_ctx.recv_buffers[index].length = payload[index].length;
}
#ifdef ENABLE_SOCKET_LOGGING
LogVerbose("Starting receive at %lf", timer_global_get_elapsed_us());
#endif
// Codes_SRS_ASYNC_SOCKET_LINUX_11_102: [ Then the context shall then be added to the completion port system by calling completion_port_add with EPOLLIN and event_complete_callback as the callback. ]
if (completion_port_add(async_socket->completion_port, EPOLLIN | EPOLLRDHUP | EPOLLONESHOT, async_socket->socket_handle, event_complete_callback, io_context) != 0)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_078: [ If any error occurs, async_socket_receive_async shall fail and return a non-zero value. ]
LogWarning("failure with completion_port_add");
result = MU_FAILURE;
}
else
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_077: [ On success, async_socket_receive_async shall return 0. ]
result = 0;
(void)interlocked_increment(&async_socket->added_to_completion_port);
goto all_ok;
}
free(io_context);
}
}
all_ok:
if (interlocked_decrement(&async_socket->pending_api_calls) == 0)
{
wake_by_address_single(&async_socket->pending_api_calls);
}
}
}
return result;
}