bool tcp_adapter_proxy::process_incoming_websocket_buffer()

in src/TcpAdapterProxy.cpp [1234:1312]


        bool tcp_adapter_proxy::process_incoming_websocket_buffer(tcp_adapter_context &tac, boost::beast::multi_buffer &message_buffer)
        {
            using namespace com::amazonaws::iot::securedtunneling;
            bool continue_reading = true;

            size_t const data_length_size = GET_SETTING(settings, DATA_LENGTH_SIZE);
            boost::beast::flat_buffer data_length_buffer{ data_length_size };
            //is there enough data to read to know data length?
            while (message_buffer.size() >= data_length_size && continue_reading)
            {
                boost::asio::buffer_copy(data_length_buffer.prepare(data_length_size), message_buffer.data(), data_length_size);
                uint16_t data_length = boost::endian::big_to_native(*reinterpret_cast<std::uint16_t const *>(data_length_buffer.data().data()));
                //is the entire message in the buffer yet?
                if (message_buffer.size() >= (data_length + data_length_size))
                {
                    //consume the length since we've already read it
                    message_buffer.consume(data_length_size);
                    bool parsed_successfully = parse_protobuf_and_consume_input(message_buffer, static_cast<size_t>(data_length), incoming_message)
                                                && incoming_message.IsInitialized();
                    if (!parsed_successfully)
                    {
                        //doesn't output actual error string unless debug protobuf library is linked to
                        throw proxy_exception((boost::format("Could not parse web socket binary frame into message: %1%") % incoming_message.InitializationErrorString()).str());
                    }
    #ifdef DEBUG
                    //BOOST_LOG_SEV(log, trace) << "Message recieved:\n" << message.DebugString(); //re-add when linked to protobuf instead of protobuf-lite
                    BOOST_LOG_SEV(log, trace) << "Message parsed successfully , type :" << incoming_message.type();
    #endif
                    if (!is_valid_stream_id(tac, incoming_message))
                    {
                        continue_reading = true;
    #ifdef DEBUG
                        BOOST_LOG_SEV(log, trace) << "Stale message recieved. Dropping";
    #endif
                    }
                    else
                    {
                        string service_id = incoming_message.serviceid();
                        // v1 message format does not need to validate service id. Set to the one service id stored in memory.
                        if (tac.adapter_config.is_v1_message_format)
                        {
                            service_id = tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first;
                        }
                        tcp_connection::pointer connection = get_tcp_connection(tac, service_id);
                        // if per connection handler is available, trigger them.
                        if (incoming_message.type() != Message_Type_DATA)
                        {
                            if (connection != nullptr && connection->on_control_message != nullptr)
                            {
                                continue_reading = connection->on_control_message(incoming_message);
                            }
                            else
                            {
                                continue_reading = on_web_socket_control_message(incoming_message);
                            }
                        }
                        else if (incoming_message.type() == Message_Type_DATA)
                        {
                            if (connection != nullptr && connection->on_data_message != nullptr)
                            {
                                continue_reading = connection->on_data_message(incoming_message);
                            }
                            else
                            {
                                continue_reading = on_web_socket_data_message(incoming_message);
                            }

                        }
                    }
                }
                else    //not enough room to read the entire msg out of our buffer so skip
                {
                    BOOST_LOG_SEV(log, trace) << "Not enough data to process complete message. Moving on to next web socket read";
                    break;
                }
            }

            return continue_reading;
        }