void socketio_dowork()

in adapters/socketio_berkeley.c [1024:1150]


void socketio_dowork(CONCRETE_IO_HANDLE socket_io)
{
    if (socket_io != NULL)
    {
        SOCKET_IO_INSTANCE* socket_io_instance = (SOCKET_IO_INSTANCE*)socket_io;
        signal(SIGPIPE, SIG_IGN);

        if (socket_io_instance->io_state == IO_STATE_OPEN)
        {
            LIST_ITEM_HANDLE first_pending_io = singlylinkedlist_get_head_item(socket_io_instance->pending_io_list);
            while (first_pending_io != NULL)
            {
                PENDING_SOCKET_IO* pending_socket_io = (PENDING_SOCKET_IO*)singlylinkedlist_item_get_value(first_pending_io);
                if (pending_socket_io == NULL)
                {
                    indicate_error(socket_io_instance);
                    LogError("Failure: retrieving socket from list");
                    break;
                }

                ssize_t send_result = send(socket_io_instance->socket, pending_socket_io->bytes, pending_socket_io->size, MSG_NOSIGNAL);
                if ((send_result < 0) || ((size_t)send_result != pending_socket_io->size))
                {
                    if (send_result == INVALID_SOCKET)
                    {
                        if (errno == EAGAIN  || errno == ENOBUFS) /*send says "come back later" with EAGAIN, ENOBUFS - likely the socket buffer cannot accept more data*/
                        {
                            /*do nothing until next dowork */
                            break;
                        }
                        else
                        {
                            free(pending_socket_io->bytes);
                            free(pending_socket_io);
                            pending_socket_io = NULL;
                            (void)singlylinkedlist_remove(socket_io_instance->pending_io_list, first_pending_io);

                            LogError("Failure: sending Socket information. errno=%d (%s).", errno, strerror(errno));
                            indicate_error(socket_io_instance);
                        }
                    }
                    else
                    {
                        /* simply wait until next dowork */
                        (void)memmove(pending_socket_io->bytes, pending_socket_io->bytes + send_result, pending_socket_io->size - send_result);
                        pending_socket_io->size -= send_result;
                        break;
                    }
                }
                else
                {
                    if (pending_socket_io->on_send_complete != NULL)
                    {
                        pending_socket_io->on_send_complete(pending_socket_io->callback_context, IO_SEND_OK);
                    }

                    free(pending_socket_io->bytes);
                    free(pending_socket_io);
                    pending_socket_io = NULL;
                    if (singlylinkedlist_remove(socket_io_instance->pending_io_list, first_pending_io) != 0)
                    {
                        indicate_error(socket_io_instance);
                        LogError("Failure: unable to remove socket from list");
                    }
                }

                first_pending_io = singlylinkedlist_get_head_item(socket_io_instance->pending_io_list);
            }

             if (socket_io_instance->io_state == IO_STATE_OPEN)
            {
                ssize_t received = 0;
                do
                {
                    received = recv(socket_io_instance->socket, socket_io_instance->recv_bytes, XIO_RECEIVE_BUFFER_SIZE, MSG_NOSIGNAL);
                    if (received > 0)
                    {
                        if (socket_io_instance->on_bytes_received != NULL)
                        {
                            /* Explicitly ignoring here the result of the callback */
                            (void)socket_io_instance->on_bytes_received(socket_io_instance->on_bytes_received_context, socket_io_instance->recv_bytes, received);
                        }
                    }
                    else if (received == 0)
                    {
                        // Do not log error here due to this is probably the socket being closed on the other end
                        indicate_error(socket_io_instance);
                    }
                    else if (received < 0 && errno != EAGAIN)
                    {
                        LogError("Socketio_Failure: Receiving data from endpoint: errno=%d.", errno);
                        indicate_error(socket_io_instance);
                    }

                } while (received > 0 && socket_io_instance->io_state == IO_STATE_OPEN);
            }
        }
        else
        {
            if (socket_io_instance->io_state == IO_STATE_OPENING)
            {
                if(lookup_address(socket_io_instance) != 0)
                {
                    LogError("Socketio_Failure: lookup address failed");
                    indicate_error(socket_io_instance);
                }
                else
                {
                    if(socket_io_instance->io_state == IO_STATE_OPEN)
                    {
                        if (initiate_socket_connection(socket_io_instance) != 0)
                        {
                            LogError("Socketio_Failure: initiate_socket_connection failed");
                            indicate_error(socket_io_instance);
                        }
                        else if (wait_for_socket_connection(socket_io_instance) != 0)
                        {
                            LogError("Socketio_Failure: wait_for_socket_connection failed");
                            indicate_error(socket_io_instance);
                        }
                    }
                }

            }
        }
    }
}