in src/TcpAdapterProxy.cpp [1234:1312]
bool tcp_adapter_proxy::process_incoming_websocket_buffer(tcp_adapter_context &tac, boost::beast::multi_buffer &message_buffer)
{
using namespace com::amazonaws::iot::securedtunneling;
bool continue_reading = true;
size_t const data_length_size = GET_SETTING(settings, DATA_LENGTH_SIZE);
boost::beast::flat_buffer data_length_buffer{ data_length_size };
//is there enough data to read to know data length?
while (message_buffer.size() >= data_length_size && continue_reading)
{
boost::asio::buffer_copy(data_length_buffer.prepare(data_length_size), message_buffer.data(), data_length_size);
uint16_t data_length = boost::endian::big_to_native(*reinterpret_cast<std::uint16_t const *>(data_length_buffer.data().data()));
//is the entire message in the buffer yet?
if (message_buffer.size() >= (data_length + data_length_size))
{
//consume the length since we've already read it
message_buffer.consume(data_length_size);
bool parsed_successfully = parse_protobuf_and_consume_input(message_buffer, static_cast<size_t>(data_length), incoming_message)
&& incoming_message.IsInitialized();
if (!parsed_successfully)
{
//doesn't output actual error string unless debug protobuf library is linked to
throw proxy_exception((boost::format("Could not parse web socket binary frame into message: %1%") % incoming_message.InitializationErrorString()).str());
}
#ifdef DEBUG
//BOOST_LOG_SEV(log, trace) << "Message recieved:\n" << message.DebugString(); //re-add when linked to protobuf instead of protobuf-lite
BOOST_LOG_SEV(log, trace) << "Message parsed successfully , type :" << incoming_message.type();
#endif
if (!is_valid_stream_id(tac, incoming_message))
{
continue_reading = true;
#ifdef DEBUG
BOOST_LOG_SEV(log, trace) << "Stale message recieved. Dropping";
#endif
}
else
{
string service_id = incoming_message.serviceid();
// v1 message format does not need to validate service id. Set to the one service id stored in memory.
if (tac.adapter_config.is_v1_message_format)
{
service_id = tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first;
}
tcp_connection::pointer connection = get_tcp_connection(tac, service_id);
// if per connection handler is available, trigger them.
if (incoming_message.type() != Message_Type_DATA)
{
if (connection != nullptr && connection->on_control_message != nullptr)
{
continue_reading = connection->on_control_message(incoming_message);
}
else
{
continue_reading = on_web_socket_control_message(incoming_message);
}
}
else if (incoming_message.type() == Message_Type_DATA)
{
if (connection != nullptr && connection->on_data_message != nullptr)
{
continue_reading = connection->on_data_message(incoming_message);
}
else
{
continue_reading = on_web_socket_data_message(incoming_message);
}
}
}
}
else //not enough room to read the entire msg out of our buffer so skip
{
BOOST_LOG_SEV(log, trace) << "Not enough data to process complete message. Moving on to next web socket read";
break;
}
}
return continue_reading;
}