int async_socket_receive_async()

in linux/src/async_socket_linux.c [735:853]


int async_socket_receive_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_BUFFER* payload, uint32_t buffer_count, ON_ASYNC_SOCKET_RECEIVE_COMPLETE on_receive_complete, void* on_receive_complete_context)
{
    int result;
    // Codes_SRS_ASYNC_SOCKET_LINUX_11_071: [ on_receive_complete_context shall be allowed to be NULL. ]
    if (
        // Codes_SRS_ASYNC_SOCKET_LINUX_11_064: [ If async_socket is NULL, async_socket_receive_async shall fail and return a non-zero value. ]
        async_socket == NULL ||
        // Codes_SRS_ASYNC_SOCKET_LINUX_11_065: [ If buffers is NULL, async_socket_receive_async shall fail and return a non-zero value. ]
        payload == NULL ||
        // Codes_SRS_ASYNC_SOCKET_LINUX_11_066: [ If buffer_count is 0, async_socket_receive_async shall fail and return a non-zero value. ]
        buffer_count == 0 ||
        // Codes_SRS_ASYNC_SOCKET_LINUX_11_070: [ If on_receive_complete is NULL, async_socket_receive_async shall fail and return a non-zero value. ]
        on_receive_complete == NULL
        )
    {
        LogError("Invalid arguments: ASYNC_SOCKET_HANDLE async_socket=%p, const ASYNC_SOCKET_BUFFER* payload=%p, uint32_t buffer_count=%" PRIu32 ", ON_ASYNC_SOCKET_RECEIVE_COMPLETE on_receive_complete=%p, void*, on_receive_complete_context=%p",
            async_socket, payload, buffer_count, on_receive_complete, on_receive_complete_context);
        result = MU_FAILURE;
    }
    else
    {
        uint32_t total_buffer_bytes = 0;
        uint32_t index;
        for (index = 0; index < buffer_count; index++)
        {
            if (
                // Codes_SRS_ASYNC_SOCKET_LINUX_11_067: [ If any of the buffers in payload has buffer set to NULL, async_socket_receive_async shall fail and return a non-zero value. ]
                payload[index].buffer == NULL ||
                // Codes_SRS_ASYNC_SOCKET_LINUX_11_068: [ If any of the buffers in payload has length set to 0, async_socket_receive_async shall fail and return a non-zero value. ]
                payload[index].length == 0
                )
            {
                LogError("Invalid buffer %" PRIu32 ": buffer=%p, length = %" PRIu32, index, payload[index].buffer, payload[index].length);
                break;
            }

            // Codes_SRS_ASYNC_SOCKET_LINUX_11_069: [ If the sum of buffer lengths for all the buffers in payload is greater than UINT32_MAX, async_socket_receive_async shall fail and return a non-zero value. ]
            if (total_buffer_bytes + payload[index].length < total_buffer_bytes)
            {
                LogError("Overflow in total buffer length computation total_buffer_bytes=%" PRIu32 " + payload[i=%" PRIu32 "].length=%" PRIu32 "", total_buffer_bytes, index, payload[index].length);
                break;
            }
            else
            {
                total_buffer_bytes += payload[index].length;
            }
        }
        if (index < buffer_count)
        {
            LogError("Invalid buffers passed to async_socket_receive_async");
            result = MU_FAILURE;
        }
        else
        {
            (void)interlocked_increment(&async_socket->pending_api_calls);

            ASYNC_SOCKET_LINUX_STATE current_state;
            // Codes_SRS_ASYNC_SOCKET_LINUX_11_072: [ If async_socket is not OPEN, async_socket_receive_async shall fail and return a non-zero value. ]
            if ((current_state = interlocked_add(&async_socket->state, 0)) != ASYNC_SOCKET_LINUX_STATE_OPEN)
            {
                /* Codes_SRS_ASYNC_SOCKET_WIN32_01_098: [ If async_socket is not OPEN, async_socket_receive_async shall fail and return a non-zero value. ]*/
                LogWarning("Not open, current state is %" PRI_MU_ENUM "", MU_ENUM_VALUE(ASYNC_SOCKET_LINUX_STATE, current_state));
                result = MU_FAILURE;
            }
            else
            {
                // Codes_SRS_ASYNC_SOCKET_LINUX_11_074: [ The context shall also allocate enough memory to keep an array of buffer_count items. ]
                ASYNC_SOCKET_IO_CONTEXT* io_context = malloc_flex(sizeof(ASYNC_SOCKET_IO_CONTEXT), buffer_count, sizeof(ASYNC_SOCKET_BUFFER));
                if (io_context == NULL)
                {
                    LogError("failure in malloc_flex(sizeof(ASYNC_SOCKET_RECV_CONTEXT)=%zu, buffer_count=%" PRIu32 ", sizeof(ASYNC_SOCKET_BUFFER)=%zu) failed",
                        sizeof(ASYNC_SOCKET_IO_CONTEXT), buffer_count, sizeof(ASYNC_SOCKET_BUFFER));
                    result = ASYNC_SOCKET_SEND_SYNC_ERROR;
                }
                else
                {
                    // Codes_SRS_ASYNC_SOCKET_LINUX_11_073: [ Otherwise async_socket_receive_async shall create a context for the recv where the payload, on_receive_complete and on_receive_complete_context shall be stored. ]
                    io_context->io_type = ASYNC_SOCKET_IO_TYPE_RECEIVE;
                    io_context->on_receive_complete = on_receive_complete;
                    io_context->callback_context = on_receive_complete_context;
                    io_context->async_socket = async_socket;
                    io_context->data.recv_ctx.total_buffer_count = buffer_count;

                    for (index = 0; index < buffer_count; index++)
                    {
                        io_context->data.recv_ctx.recv_buffers[index].buffer = payload[index].buffer;
                        io_context->data.recv_ctx.recv_buffers[index].length = payload[index].length;
                    }

#ifdef ENABLE_SOCKET_LOGGING
                    LogVerbose("Starting receive at %lf", timer_global_get_elapsed_us());
#endif

                    // Codes_SRS_ASYNC_SOCKET_LINUX_11_102: [ Then the context shall then be added to the completion port system by calling completion_port_add with EPOLLIN and event_complete_callback as the callback. ]
                    if (completion_port_add(async_socket->completion_port, EPOLLIN | EPOLLRDHUP | EPOLLONESHOT, async_socket->socket_handle, event_complete_callback, io_context) != 0)
                    {
                        // Codes_SRS_ASYNC_SOCKET_LINUX_11_078: [ If any error occurs, async_socket_receive_async shall fail and return a non-zero value. ]
                        LogWarning("failure with completion_port_add");
                        result = MU_FAILURE;
                    }
                    else
                    {
                        // Codes_SRS_ASYNC_SOCKET_LINUX_11_077: [ On success, async_socket_receive_async shall return 0. ]
                        result = 0;
                        (void)interlocked_increment(&async_socket->added_to_completion_port);
                        goto all_ok;
                    }
                    free(io_context);
                }
            }
all_ok:
            if (interlocked_decrement(&async_socket->pending_api_calls) == 0)
            {
                wake_by_address_single(&async_socket->pending_api_calls);
            }
        }
    }
    return result;
}