in src/TcpAdapterProxy.cpp [1529:1568]
void tcp_adapter_proxy::async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr<boost::beast::flat_buffer> const& data_to_send, std::string const & service_id)
{
BOOST_LOG_SEV(log, trace) << "Sending messages over web socket for service id: " << service_id;
BOOST_LOG_SEV(log, trace) << "Current queue size: " << tac.web_socket_outgoing_message_queue.size();
// Always add to queue and invoke the send message complete
if (data_to_send != nullptr)
{
BOOST_LOG_SEV(log, trace) << "Put data " << data_to_send->size() << " bytes into the web_socket_outgoing_message_queue for service id: " << service_id;
tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id);
data_message temp = std::make_pair(data_to_send, socket_connection->after_send_message);
tac.web_socket_outgoing_message_queue.push(temp);
// Are we already writing?
if(tac.web_socket_outgoing_message_queue.size() > 1)
return;
}
// We are not currently writing, so send this immediately
data_message message_to_send = tac.web_socket_outgoing_message_queue.front();
tac.wss->async_write(message_to_send.first->data(), [=, &tac](boost::system::error_code const &ec, std::size_t const bytes_sent)
{
if (ec)
{
throw proxy_exception("Error sending web socket message", ec);
}
BOOST_LOG_SEV(log, trace) << "Sent " << bytes_sent << " bytes over websocket for service id: " << service_id;
std::function<void()> capture_after_send_message = message_to_send.second;
tac.web_socket_outgoing_message_queue.pop();
if(capture_after_send_message)
{
capture_after_send_message();
}
if(tac.web_socket_outgoing_message_queue.empty())
{
BOOST_LOG_SEV(log, trace) << "web_socket_outgoing_message_queue is empty, no more messages to send.";
return;
}
async_send_message_to_web_socket(tac, nullptr, service_id);
});
}