ASYNC_SOCKET_SEND_SYNC_RESULT async_socket_send_async()

in linux/src/async_socket_linux.c [569:733]


ASYNC_SOCKET_SEND_SYNC_RESULT async_socket_send_async(ASYNC_SOCKET_HANDLE async_socket, const ASYNC_SOCKET_BUFFER* buffers, uint32_t buffer_count, ON_ASYNC_SOCKET_SEND_COMPLETE on_send_complete, void* on_send_complete_context)
{
    ASYNC_SOCKET_SEND_SYNC_RESULT result;
    // Codes_SRS_ASYNC_SOCKET_LINUX_11_050: [ on_send_complete_context shall be allowed to be NULL. ]
    if (
        // Codes_SRS_ASYNC_SOCKET_LINUX_11_043: [ If async_socket is NULL, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
        async_socket == NULL ||
        // Codes_SRS_ASYNC_SOCKET_LINUX_11_044: [ If buffers is NULL, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
        buffers == NULL ||
        // Codes_SRS_ASYNC_SOCKET_LINUX_11_045: [ If buffer_count is 0, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
        buffer_count == 0 ||
        // Codes_SRS_ASYNC_SOCKET_LINUX_11_049: [ If on_send_complete is NULL, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
        on_send_complete == NULL
        )
    {
        LogError("Invalid arguments: ASYNC_SOCKET_HANDLE async_socket=%p, const ASYNC_SOCKET_BUFFER* payload=%p, uint32_t buffer_count=%" PRIu32 ", ON_ASYNC_SOCKET_SEND_COMPLETE on_send_complete=%p, void*, on_send_complete_context=%p",
            async_socket, buffers, buffer_count, on_send_complete, on_send_complete_context);
        result = ASYNC_SOCKET_SEND_SYNC_ERROR;
    }
    else
    {
        uint32_t index;
        uint32_t total_buffer_bytes = 0;

        for (index = 0; index < buffer_count; index++)
        {
            if (
                // Codes_SRS_ASYNC_SOCKET_LINUX_11_046: [ If any of the buffers in payload has buffer set to NULL, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
                (buffers[index].buffer == NULL) ||
                // Codes_SRS_ASYNC_SOCKET_LINUX_11_047: [ If any of the buffers in payload has length set to 0, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
                (buffers[index].length == 0)
                )
            {
                LogError("Invalid buffer %" PRIu32 ": buffer=%p, length = %" PRIu32, index, buffers[index].buffer, buffers[index].length);
                break;
            }
            // Codes_SRS_ASYNC_SOCKET_LINUX_11_048: [ If the sum of buffer lengths for all the buffers in payload is greater than UINT32_MAX, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
            if (total_buffer_bytes + buffers[index].length < total_buffer_bytes)
            {
                LogError("Overflow in total buffer length computation (total_buffer_bytes=%" PRIu32 " + buffers[i=%" PRIu32 "].length=%" PRIu32 "", total_buffer_bytes, index, buffers[index].length);
                break;
            }
            else
            {
                total_buffer_bytes += buffers[index].length;
            }
        }

        if (index < buffer_count)
        {
            // Codes_SRS_ASYNC_SOCKET_LINUX_11_063: [ If any error occurs, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
            LogError("Invalid buffers passed to async_socket_send_async");
            result = ASYNC_SOCKET_SEND_SYNC_ERROR;
        }
        else
        {
            (void)interlocked_increment(&async_socket->pending_api_calls);

            ASYNC_SOCKET_LINUX_STATE current_state;
            // Codes_SRS_ASYNC_SOCKET_LINUX_11_051: [ If async_socket is not OPEN, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_NOT_OPEN. ]
            if ((current_state = interlocked_add(&async_socket->state, 0)) != ASYNC_SOCKET_LINUX_STATE_OPEN)
            {
                LogWarning("Not open, current state is %" PRI_MU_ENUM "", MU_ENUM_VALUE(ASYNC_SOCKET_LINUX_STATE, current_state));
                result = ASYNC_SOCKET_SEND_SYNC_NOT_OPEN;
            }
            else
            {
#ifdef ENABLE_SOCKET_LOGGING
                LogVerbose("Starting send of %" PRIu32 " bytes at %lf", total_buffer_bytes, timer_global_get_elapsed_us());
#endif

                ASYNC_SOCKET_SEND_RESULT send_result;
                for (index = 0; index < buffer_count; index++)
                {
                    int error_no;
                    ssize_t total_data_sent;
                    // Codes_SRS_ASYNC_SOCKET_LINUX_11_054: [ If the send fails to send the data, async_socket_send_async shall do the following: ]
                    if (send_data(async_socket, &buffers[index], &total_data_sent, &error_no) != 0)
                    {
                        // Codes_SRS_ASYNC_SOCKET_LINUX_11_055: [ If the errno value is EAGAIN or EWOULDBLOCK. ]
                        if (error_no == EAGAIN || error_no == EWOULDBLOCK)
                        {
                            // Codes_SRS_ASYNC_SOCKET_LINUX_11_056: [ async_socket_send_async shall create a context for the send where the payload, on_send_complete and on_send_complete_context shall be stored. ]
                            ASYNC_SOCKET_IO_CONTEXT* io_context = malloc(sizeof(ASYNC_SOCKET_IO_CONTEXT));
                            if (io_context == NULL)
                            {
                                LogError("malloc(sizeof(ASYNC_SOCKET_SEND_CONTEXT)=%zu failed", sizeof(ASYNC_SOCKET_IO_CONTEXT));
                                result = ASYNC_SOCKET_SEND_SYNC_ERROR;
                                send_result = ASYNC_SOCKET_SEND_ABANDONED;
                                break;
                            }
                            else
                            {
                                send_result = ASYNC_SOCKET_SEND_ERROR;
                                io_context->io_type = ASYNC_SOCKET_IO_TYPE_SEND;
                                io_context->data.send_ctx.total_buffer_bytes = total_buffer_bytes - total_data_sent;
                                io_context->on_send_complete = on_send_complete;
                                io_context->callback_context = on_send_complete_context;
                                io_context->async_socket = async_socket;

                                io_context->data.send_ctx.socket_buffer.buffer = buffers[index].buffer + total_data_sent;
                                io_context->data.send_ctx.socket_buffer.length = buffers[index].length - total_data_sent;

                                // Codes_SRS_ASYNC_SOCKET_LINUX_11_057: [ The context shall then be added to the completion port system by calling completion_port_add with EPOLL_CTL_MOD and `event_complete_callback` as the callback. ]
                                if (completion_port_add(async_socket->completion_port, EPOLLOUT, async_socket->socket_handle, event_complete_callback, io_context) != 0)
                                {
                                    LogError("failure with completion_port_add");
                                    result = ASYNC_SOCKET_SEND_SYNC_ERROR;
                                }
                                else
                                {
                                    (void)interlocked_increment(&async_socket->added_to_completion_port);
                                    result = ASYNC_SOCKET_SEND_SYNC_OK;
                                    goto all_ok;
                                }
                            }
                        }
                        // Codes_SRS_ASYNC_SOCKET_LINUX_11_059: [ If the errno value is ECONNRESET, ENOTCONN, or EPIPE shall fail and return ASYNC_SOCKET_SEND_SYNC_NOT_OPEN. ]
                        else if (error_no == ECONNRESET || error_no == ENOTCONN || error_no == EPIPE)
                        {
                            LOGGER_LOG_EX(LOG_LEVEL_WARNING, LOG_ERRNO(), LOG_MESSAGE("The connection was forcibly closed by the peer"));
                            send_result = ASYNC_SOCKET_SEND_ABANDONED;
                            // Socket was closed
                            result = ASYNC_SOCKET_SEND_SYNC_NOT_OPEN;
                        }
                        else
                        {
                            // Codes_SRS_ASYNC_SOCKET_LINUX_11_060: [ If any other error is encountered, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_ERROR. ]
                            LogErrorNo("failure sending socket error no");
                            result = ASYNC_SOCKET_SEND_SYNC_ERROR;
                            send_result = ASYNC_SOCKET_SEND_ERROR;
                        }
                    }
                    else
                    {
                        // Codes_SRS_ASYNC_SOCKET_LINUX_11_062: [ On success, async_socket_send_async shall return ASYNC_SOCKET_SEND_SYNC_OK. ]
                        send_result = ASYNC_SOCKET_SEND_OK;
                        result = ASYNC_SOCKET_SEND_SYNC_OK;
                    }
                }
                // Only call the callback if the call was successfully sent
                // Otherwise we're going to be returning an error
                if (send_result == ASYNC_SOCKET_SEND_OK)
                {
#ifdef ENABLE_SOCKET_LOGGING
                    LogVerbose("Send completed synchronously at %lf", timer_global_get_elapsed_us());
#endif
                    // Codes_SRS_ASYNC_SOCKET_LINUX_11_061: [ If the send is successful, async_socket_send_async shall call the on_send_complete with on_send_complete_context and ASYNC_SOCKET_SEND_SYNC_OK. ]
                    on_send_complete(on_send_complete_context, send_result);
                }
                else
                {
                    // Do nothing.  If failure happend do not call the callback
                }
            }
all_ok:
            if (interlocked_decrement(&async_socket->pending_api_calls) == 0)
            {
                wake_by_address_single(&async_socket->pending_api_calls);
            }
        }
    }

    return result;
}