in src/TcpAdapterProxy.cpp [436:485]
void tcp_adapter_proxy::async_send_stream_start(tcp_adapter_context &tac, string const & service_id)
{
using namespace com::amazonaws::iot::securedtunneling;
if (!tac.is_service_ids_received)
{
std::shared_ptr<basic_retry_config> retry_config =
std::make_shared<basic_retry_config>(tac.io_ctx,
GET_SETTING(settings, TCP_CONNECTION_RETRY_COUNT),
GET_SETTING(settings, TCP_CONNECTION_RETRY_DELAY_MS),
std::bind(&tcp_adapter_proxy::async_send_stream_start, this, std::ref(tac), service_id));
BOOST_LOG_SEV(log, error) << "No service ids received. Will retry.";
basic_retry_execute(log, retry_config, []() { throw std::runtime_error("Fail all the retries to get service ids before stream start. Exit."); });
return;
}
std::string src_listening_port = boost::lexical_cast<std::string>(tac.serviceId_to_tcp_server_map[service_id]->acceptor().local_endpoint().port());
if (tac.adapter_config.serviceId_to_endpoint_map.find(service_id) == tac.adapter_config.serviceId_to_endpoint_map.end() ||
tac.adapter_config.serviceId_to_endpoint_map.at(service_id) != src_listening_port)
{
throw std::runtime_error((boost::format("Receive incoming connection from non-configured port: %1%") % src_listening_port).str());
}
/**
* Initialize stream id to 1. If a mapping exist for a certain service id, it will be overwrite to the value
* from the serviceId_to_streamId_map.
*/
std::int32_t new_stream_id = 1;
if(tac.serviceId_to_streamId_map.find(service_id) != tac.serviceId_to_streamId_map.end())
{
std::int32_t old_stream_id = tac.serviceId_to_streamId_map[service_id];
// Reset old stream id to 0 if it already reaches the max value of current type
if (old_stream_id == std::numeric_limits<decltype(old_stream_id)>::max())
{
old_stream_id = 0;
}
new_stream_id = old_stream_id + 1;
}
// Update streamId <-> serviceId mapping for future book keeping
tac.serviceId_to_streamId_map[service_id] = new_stream_id;
BOOST_LOG_SEV(log, debug) << "Setting new stream ID to: " << new_stream_id << ", service id: " << service_id;
outgoing_message.set_type(Message_Type_STREAM_START);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(new_stream_id);
outgoing_message.set_ignorable(false);
outgoing_message.clear_payload();
async_send_message(tac, outgoing_message);
}