in linux/src/async_socket_linux.c [180:382]
static void event_complete_callback(void* context, COMPLETION_PORT_EPOLL_ACTION action)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_079: [ If context is NULL, event_complete_callback shall do nothing. ]
if (context == NULL)
{
LogCritical("Invalid arguement event_complete_callback void* context, COMPLETION_PORT_EPOLL_EPOLL_ACTION epoll_action, COMPLETION_PORT_EPOLL_EPOLL_RESULT result, int32_t amount_transfered");
}
else
{
ASYNC_SOCKET_IO_CONTEXT* io_context = (ASYNC_SOCKET_IO_CONTEXT*)context;
switch (action)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_080: [ If COMPLETION_PORT_EPOLL_ACTION is COMPLETION_PORT_EPOLL_EPOLLRDHUP or COMPLETION_PORT_EPOLL_ABANDONED, event_complete_callback shall do the following: ]
case COMPLETION_PORT_EPOLL_ABANDONED:
case COMPLETION_PORT_EPOLL_EPOLLRDHUP:
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_081: [ event_complete_callback shall call either the send or recv complete callback with an ABANDONED flag when the IO type is either ASYNC_SOCKET_IO_TYPE_SEND or ASYNC_SOCKET_IO_TYPE_RECEIVE respectively. ]
if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_RECEIVE)
{
LogError("Receive socket has encountered %" PRI_MU_ENUM "", MU_ENUM_VALUE(COMPLETION_PORT_EPOLL_ACTION, action));
io_context->on_receive_complete(io_context->callback_context, ASYNC_SOCKET_RECEIVE_ABANDONED, 0);
}
else if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_SEND)
{
LogError("Send socket has encountered %" PRI_MU_ENUM "", MU_ENUM_VALUE(COMPLETION_PORT_EPOLL_ACTION, action));
io_context->on_send_complete(io_context->callback_context, ASYNC_SOCKET_SEND_ABANDONED);
}
else
{
LogError("Notify socket has encountered %" PRI_MU_ENUM "", MU_ENUM_VALUE(COMPLETION_PORT_EPOLL_ACTION, action));
// Codes_SRS_ASYNC_SOCKET_LINUX_04_008: [ event_complete_callback shall call the notify complete callback with an ABANDONED flag when the IO type is ASYNC_SOCKET_IO_TYPE_NOTIFY. ]
io_context->on_notify_io_complete(io_context->callback_context, ASYNC_SOCKET_NOTIFY_IO_RESULT_ABANDONED);
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_084: [ Then event_complete_callback shall free the context memory. ]
free(io_context);
break;
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_082: [ If COMPLETION_PORT_EPOLL_ACTION is COMPLETION_PORT_EPOLL_EPOLLIN, event_complete_callback shall do the following: ]
case COMPLETION_PORT_EPOLL_EPOLLIN:
{
if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_NOTIFY)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_04_009: [ If the IO type is ASYNC_SOCKET_IO_TYPE_NOTIFY then event_complete_callback shall call the notify complete callback with an IN flag. ]
io_context->on_notify_io_complete(io_context->callback_context, ASYNC_SOCKET_NOTIFY_IO_RESULT_IN);
}
else
{
ASYNC_SOCKET_RECEIVE_RESULT receive_result;
uint32_t index = 0;
uint32_t total_recv_size = 0;
do
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_083: [ Otherwise event_complete_callback shall call the on_recv callback with the recv_buffer buffer and length and do the following: ]
ssize_t recv_size = io_context->async_socket->on_recv(io_context->async_socket->on_recv_context, io_context->async_socket, io_context->data.recv_ctx.recv_buffers[index].buffer, io_context->data.recv_ctx.recv_buffers[index].length);
// Codes_SRS_ASYNC_SOCKET_LINUX_11_088: [ If the recv size < 0, then: ]
if (recv_size < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_089: [ If errno is EAGAIN or EWOULDBLOCK, then no data is available and event_complete_callback will break out of the function. ]
receive_result = ASYNC_SOCKET_RECEIVE_OK;
break;
}
else
{
total_recv_size = 0;
recv_size = 0;
if (errno == ECONNRESET)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_090: [ If errno is ECONNRESET, then thread_worker_func shall call the on_receive_complete callback with the on_receive_complete_context and ASYNC_SOCKET_RECEIVE_ABANDONED. ]
receive_result = ASYNC_SOCKET_RECEIVE_ABANDONED;
LogError("A reset on the recv socket has been encountered");
}
else
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_095: [ If errno is any other error, then event_complete_callback shall call the on_receive_complete callback with the on_receive_complete_context and ASYNC_SOCKET_RECEIVE_ERROR. ]
receive_result = ASYNC_SOCKET_RECEIVE_ERROR;
LogErrorNo("failure recv data");
}
break;
}
}
else if (recv_size == 0)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_091: [ If the recv size equals 0, then event_complete_callback shall call on_receive_complete callback with the on_receive_complete_context and ASYNC_SOCKET_RECEIVE_ABANDONED. ]
LogError("Socket received 0 bytes, assuming socket is closed");
receive_result = ASYNC_SOCKET_RECEIVE_ABANDONED;
break;
}
else
{
if (recv_size > UINT32_MAX ||
UINT32_MAX - total_recv_size < recv_size)
{
// Handle unlikely overflow
LogError("Overflow in computing receive size (total_recv_size=%" PRIu32 " + recv_size=%zi > UINT32_MAX=%" PRIu32 ")",
total_recv_size, recv_size, UINT32_MAX);
receive_result = ASYNC_SOCKET_RECEIVE_ERROR;
}
else
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_092: [ If the recv size > 0, if we have another buffer to fill then we will attempt another read, otherwise we shall call on_receive_complete callback with the on_receive_complete_context and ASYNC_SOCKET_RECEIVE_OK ]
total_recv_size += recv_size;
if (index + 1 >= io_context->data.recv_ctx.total_buffer_count || recv_size <= io_context->data.recv_ctx.recv_buffers[index].length)
{
#ifdef ENABLE_SOCKET_LOGGING
LogVerbose("Asynchronous receive of %" PRIu32 " bytes completed at %lf", bytes_received, timer_global_get_elapsed_us());
#endif
receive_result = ASYNC_SOCKET_RECEIVE_OK;
break;
}
else
{
index++;
}
}
}
} while (true);
// Call the callback
io_context->on_receive_complete(io_context->callback_context, receive_result, total_recv_size);
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_093: [ event_complete_callback shall then free the io_context memory. ]
free(io_context);
break;
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_094: [ If the events value contains COMPLETION_PORT_EPOLL_EPOLLOUT, event_complete_callback shall the following: ]
case COMPLETION_PORT_EPOLL_EPOLLOUT:
{
if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_NOTIFY)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_04_010: [ If the IO type is ASYNC_SOCKET_IO_TYPE_NOTIFY then event_complete_callback shall call the notify complete callback with an OUT flag. ]
io_context->on_notify_io_complete(io_context->callback_context, ASYNC_SOCKET_NOTIFY_IO_RESULT_OUT);
}
else
{
ASYNC_SOCKET_SEND_RESULT send_result;
int error_no;
ssize_t total_data_sent;
// Codes_SRS_ASYNC_SOCKET_LINUX_11_096: [ event_complete_callback shall call send on the data in the ASYNC_SOCKET_SEND_CONTEXT buffer. ]
if (send_data(io_context->async_socket, &io_context->data.send_ctx.socket_buffer, &total_data_sent, &error_no) != 0)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_097: [ If send returns value is < 0 event_complete_callback shall do the following: ]
if (error_no == ECONNRESET)
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_098: [ if errno is ECONNRESET, then on_send_complete shall be called with ASYNC_SOCKET_SEND_ABANDONED. ]
send_result = ASYNC_SOCKET_SEND_ABANDONED;
LogError("A reset on the send socket has been encountered");
}
else
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_099: [ if errno is anything else, then on_send_complete shall be called with ASYNC_SOCKET_SEND_ERROR. ]
send_result = ASYNC_SOCKET_SEND_ERROR;
LogErrorNo("failure sending data length: %" PRIu32 "", io_context->data.send_ctx.socket_buffer.length);
}
}
else
{
#ifdef ENABLE_SOCKET_LOGGING
LogVerbose("Asynchronous send of %" PRIu32 " bytes completed at %lf", bytes_sent, timer_global_get_elapsed_us());
#endif
send_result = ASYNC_SOCKET_SEND_OK;
}
io_context->on_send_complete(io_context->callback_context, send_result);
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_100: [ Then event_complete_callback shall free the io_context memory ]
free(io_context);
break;
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_085: [ If the events value contains COMPLETION_PORT_EPOLL_ERROR, event_complete_callback shall the following: ]
case COMPLETION_PORT_EPOLL_ERROR:
default:
{
// Codes_SRS_ASYNC_SOCKET_LINUX_11_086: [ Otherwise event_complete_callback shall call either the send or recv complete callback with an ERROR flag. ]
if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_RECEIVE)
{
LogError("Receive socket has encountered COMPLETION_PORT_EPOLL_ERROR");
io_context->on_receive_complete(io_context->callback_context, ASYNC_SOCKET_RECEIVE_ERROR, 0);
}
else if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_SEND)
{
LogError("Send socket has encountered COMPLETION_PORT_EPOLL_ERROR");
io_context->on_send_complete(io_context->callback_context, ASYNC_SOCKET_SEND_ERROR);
}
else
{
LogError("Notify socket has encountered COMPLETION_PORT_EPOLL_ERROR");
// Codes_SRS_ASYNC_SOCKET_LINUX_04_011: [ If the IO type is ASYNC_SOCKET_IO_TYPE_NOTIFY then event_complete_callback shall call the notify complete callback with an ERROR flag. ]
io_context->on_notify_io_complete(io_context->callback_context, ASYNC_SOCKET_NOTIFY_IO_RESULT_ERROR);
}
// Codes_SRS_ASYNC_SOCKET_LINUX_11_087: [ Then event_complete_callback shall and free the io_context memory. ]
free(io_context);
break;
}
}
}
}