in src/TcpAdapterProxy.cpp [772:841]
void tcp_adapter_proxy::async_tcp_socket_read_loop(tcp_adapter_context & tac, string const & service_id)
{
BOOST_LOG_SEV(log, trace) << "Begin tcp socket read loop for service id : " << service_id;
tcp_connection::pointer connection = get_tcp_connection(tac, service_id);
if (!connection->socket().is_open())
{
BOOST_LOG_SEV(log, trace) << "socket for service id : " << service_id << " is not open yet, skip reading";
return;
}
if (connection->is_tcp_socket_reading_)
{
#ifdef DEBUG
BOOST_LOG_SEV(log, debug) << "Not starting TCP read loop";
#endif
}
else if (wss_has_enough_write_buffer_space(connection->web_socket_data_write_buffer_))
{
//max bytes to read not to exceed either the read buffer capacity, or the available space in the web socket write buffer
std::size_t max_bytes_to_read = std::min(connection->web_socket_data_write_buffer_.max_size() - connection->web_socket_data_write_buffer_.size(), connection->tcp_read_buffer_.max_size());
connection->is_tcp_socket_reading_ = true;
connection->socket_.async_read_some(connection->tcp_read_buffer_.prepare(max_bytes_to_read),
[&, service_id](boost::system::error_code const &ec, std::size_t const bytes_read)
{
BOOST_LOG_SEV(log, trace) << "Reading from tcp socket for service id " << service_id;
tcp_connection::pointer socket_read_connection = get_tcp_connection(tac, service_id);
socket_read_connection->is_tcp_socket_reading_ = false;
if (ec)
{
if (socket_read_connection->on_tcp_error)
{
socket_read_connection->on_tcp_error(ec);
socket_read_connection->on_tcp_error = nullptr;
}
else
{
tcp_socket_error(tac, ec, service_id);
}
}
else
{
socket_read_connection->tcp_read_buffer_.commit(bytes_read);
#ifdef DEBUG
BOOST_LOG_SEV(log, trace) << "TCP socket read " << bytes_read << " bytes";
#endif
BOOST_LOG_SEV(log, trace) << "TCP socket read " << bytes_read << " bytes";
std::size_t bytes_copied = boost::asio::buffer_copy(socket_read_connection->web_socket_data_write_buffer_.prepare(bytes_read), socket_read_connection->tcp_read_buffer_.data(), bytes_read);
socket_read_connection->tcp_read_buffer_.consume(bytes_read);
socket_read_connection->web_socket_data_write_buffer_.commit(bytes_copied);
if (wss_has_enough_write_buffer_space(socket_read_connection->web_socket_data_write_buffer_))
{
async_tcp_socket_read_loop(tac, service_id);
}
else
{
BOOST_LOG_SEV(log, debug) << "No more space in web socket write buffer or tcp socket is closed. Stopping tcp read loop";
}
if (socket_read_connection->web_socket_data_write_buffer_.size() > 0) {
async_setup_web_socket_write_buffer_drain(tac, service_id);
}
}
});
}
else
{
#ifdef DEBUG
BOOST_LOG_SEV(log, debug) << "TCP socket read loop started while web socket write buffer is already full";
#endif
}
}