void tcp_adapter_proxy::async_send_stream_start()

in src/TcpAdapterProxy.cpp [436:485]


    void tcp_adapter_proxy::async_send_stream_start(tcp_adapter_context &tac, string const & service_id)
    {
        using namespace com::amazonaws::iot::securedtunneling;
        if (!tac.is_service_ids_received)
        {
            std::shared_ptr<basic_retry_config> retry_config =
                    std::make_shared<basic_retry_config>(tac.io_ctx,
                                                         GET_SETTING(settings, TCP_CONNECTION_RETRY_COUNT),
                                                         GET_SETTING(settings, TCP_CONNECTION_RETRY_DELAY_MS),
                                                         std::bind(&tcp_adapter_proxy::async_send_stream_start, this, std::ref(tac), service_id));
            BOOST_LOG_SEV(log, error) << "No service ids received. Will retry.";
            basic_retry_execute(log, retry_config, []() { throw std::runtime_error("Fail all the retries to get service ids before stream start. Exit."); });
            return;
        }
        std::string src_listening_port = boost::lexical_cast<std::string>(tac.serviceId_to_tcp_server_map[service_id]->acceptor().local_endpoint().port());
        if (tac.adapter_config.serviceId_to_endpoint_map.find(service_id) == tac.adapter_config.serviceId_to_endpoint_map.end() ||
        tac.adapter_config.serviceId_to_endpoint_map.at(service_id) != src_listening_port)
        {
            throw std::runtime_error((boost::format("Receive incoming connection from non-configured port: %1%") % src_listening_port).str());
        }

        /**
         * Initialize stream id to 1. If a mapping exist for a certain service id, it will be overwrite to the value
         * from the serviceId_to_streamId_map.
         */
        std::int32_t new_stream_id = 1;

        if(tac.serviceId_to_streamId_map.find(service_id) != tac.serviceId_to_streamId_map.end())
        {
            std::int32_t old_stream_id = tac.serviceId_to_streamId_map[service_id];
            // Reset old stream id to 0 if it already reaches the max value of current type
            if (old_stream_id == std::numeric_limits<decltype(old_stream_id)>::max())
            {
                old_stream_id = 0;
            }
            new_stream_id = old_stream_id + 1;
        }

        // Update streamId <-> serviceId mapping for future book keeping
        tac.serviceId_to_streamId_map[service_id] = new_stream_id;

        BOOST_LOG_SEV(log, debug) << "Setting new stream ID to: " << new_stream_id << ", service id: " << service_id;

        outgoing_message.set_type(Message_Type_STREAM_START);
        outgoing_message.set_serviceid(service_id);
        outgoing_message.set_streamid(new_stream_id);
        outgoing_message.set_ignorable(false);
        outgoing_message.clear_payload();
        async_send_message(tac, outgoing_message);
    }