void tcp_adapter_proxy::async_tcp_socket_read_loop()

in src/TcpAdapterProxy.cpp [772:841]


    void tcp_adapter_proxy::async_tcp_socket_read_loop(tcp_adapter_context & tac, string const & service_id)
    {
        BOOST_LOG_SEV(log, trace) << "Begin tcp socket read loop for service id : " << service_id;
        tcp_connection::pointer connection = get_tcp_connection(tac, service_id);
        if (!connection->socket().is_open())
        {
            BOOST_LOG_SEV(log, trace) << "socket for service id : " << service_id << " is not open yet, skip reading";
            return;
        }
        if (connection->is_tcp_socket_reading_)
        {
#ifdef DEBUG
            BOOST_LOG_SEV(log, debug) << "Not starting TCP read loop";
#endif
        }
        else if (wss_has_enough_write_buffer_space(connection->web_socket_data_write_buffer_))
        {
            //max bytes to read not to exceed either the read buffer capacity, or the available space in the web socket write buffer
            std::size_t max_bytes_to_read = std::min(connection->web_socket_data_write_buffer_.max_size() - connection->web_socket_data_write_buffer_.size(), connection->tcp_read_buffer_.max_size());
            connection->is_tcp_socket_reading_ = true;
            connection->socket_.async_read_some(connection->tcp_read_buffer_.prepare(max_bytes_to_read),
                [&, service_id](boost::system::error_code const &ec, std::size_t const bytes_read)
                {
                    BOOST_LOG_SEV(log, trace) << "Reading from tcp socket for service id " << service_id;
                    tcp_connection::pointer socket_read_connection = get_tcp_connection(tac, service_id);
                    socket_read_connection->is_tcp_socket_reading_ = false;
                    if (ec)
                    {
                        if (socket_read_connection->on_tcp_error)
                        {
                            socket_read_connection->on_tcp_error(ec);
                            socket_read_connection->on_tcp_error = nullptr;
                        }
                        else
                        {
                            tcp_socket_error(tac, ec, service_id);
                        }
                    }
                    else
                    {
                        socket_read_connection->tcp_read_buffer_.commit(bytes_read);
#ifdef DEBUG
                        BOOST_LOG_SEV(log, trace) << "TCP socket read " << bytes_read << " bytes";
#endif
                        BOOST_LOG_SEV(log, trace) << "TCP socket read " << bytes_read << " bytes";
                        std::size_t bytes_copied = boost::asio::buffer_copy(socket_read_connection->web_socket_data_write_buffer_.prepare(bytes_read), socket_read_connection->tcp_read_buffer_.data(), bytes_read);
                        socket_read_connection->tcp_read_buffer_.consume(bytes_read);
                        socket_read_connection->web_socket_data_write_buffer_.commit(bytes_copied);

                        if (wss_has_enough_write_buffer_space(socket_read_connection->web_socket_data_write_buffer_))
                        {
                            async_tcp_socket_read_loop(tac, service_id);
                        }
                        else
                        {
                            BOOST_LOG_SEV(log, debug) << "No more space in web socket write buffer or tcp socket is closed. Stopping tcp read loop";
                        }
                        if (socket_read_connection->web_socket_data_write_buffer_.size() > 0) {
                                async_setup_web_socket_write_buffer_drain(tac, service_id);
                            }
                        }
                    });
            }
            else
            {
    #ifdef DEBUG
                BOOST_LOG_SEV(log, debug) << "TCP socket read loop started while web socket write buffer is already full";
    #endif
            }
        }