bool tcp_adapter_proxy::handle_control_message_service_ids()

in src/TcpAdapterProxy.cpp [966:1103]


        bool tcp_adapter_proxy::handle_control_message_service_ids(tcp_adapter_context &tac, message const & message)
        {
            using namespace com::amazonaws::iot::securedtunneling;
            using namespace aws::iot::securedtunneling::config_file;
            tac.is_service_ids_received = true;
            std::unordered_set <std::string> service_id_list;
            std::unordered_set<string> found_service_ids;
            std::unordered_set <std::string> unfound_service_ids;
            // Cannot start the stream before receiving service ids.
            if (message.type() == Message_Type_STREAM_START)
            {
                throw proxy_exception("Receive stream start before receiving service ids. Cannot forward data.");
            }
            else if (message.type() != Message_Type_SERVICE_IDS)
            {
                BOOST_LOG_SEV(log, debug) << "Expect:Message_Type_SERVICE_IDS. Ignore message type: " << message.type();
                return false;
            }
            BOOST_LOG_SEV(log, debug) << "Extracting service Ids from control message " << message.type();
            for (int i = 0; i < message.availableserviceids_size(); i++)
            {
                std::string id = message.availableserviceids(i);
                if (service_id_list.find(id) != service_id_list.end())
                {
                    BOOST_LOG_SEV(log, warning) << "Duplicate service Id received, ignore: "<< id;
                    continue;
                }
                service_id_list.insert(id);
            }
            BOOST_LOG_SEV(log, trace) << "Service id received: ";
            for (auto s: service_id_list)
            {
                BOOST_LOG_SEV(log, trace) << s;
            }
            if (!tcp_adapter_proxy::validate_service_ids_from_configuration(tac, service_id_list))
            {
                throw std::runtime_error("Wrong configurations detected in local proxy. Please starts local proxy with right sets of service ids.");
            }

            /**
             * Set flag to mark local proxy will communicate using local proxy v1 message format.
             * local proxy v1 message format: 1 service id. It can be a empty string when open tunnel with no service in destination config.
             */
            if (service_id_list.size() == 1)
            {
                tac.adapter_config.is_v1_message_format = true;
            }
            /**
             * Build serviceId <-> endpoint mapping if not done yet.
             * Case1: Configuration is provided through configuration files. Upon receiving service ids, search through
             * the configuration directory and find the service ids provided in those files.
             * Case 2: Configuration is NOT provided from both files or CLI. Local proxy need to randomly pick up ports
             * to use if running in source mode.
             * Case 3: If not enough service ids are found through configuration files, local proxy helps to pick random
             * available ports, if starts in source mode.
             * If serviceId <-> endpoint mapping already exists, validate the mapping provided through CLI.
             */

            if (tac.adapter_config.serviceId_to_endpoint_map.empty())
             {
                 BOOST_LOG_SEV(log, trace) << "Build serviceId <-> endpoint mapping upon receiving service ids";

                 // Scan configuration files to find port mappings
                 if (!tac.adapter_config.config_files.empty())
                 {
                     BOOST_LOG_SEV(log, info) << "Scan configuration files to find the service ids";
                     read_service_ids_from_config_files(tac.adapter_config.config_files, service_id_list, tac.adapter_config.serviceId_to_endpoint_map);

                     std::transform(tac.adapter_config.serviceId_to_endpoint_map.cbegin(), tac.adapter_config.serviceId_to_endpoint_map.cend(),
                                    std::inserter(found_service_ids, found_service_ids.begin()),
                                    [](const std::pair<std::string, std::string>& key_value)
                                    { return key_value.first; });

                     std::set_difference(service_id_list.begin(), service_id_list.end(), found_service_ids.begin(), found_service_ids.end(),
                                         std::inserter(unfound_service_ids, unfound_service_ids.end()));

                     if (!unfound_service_ids.empty())
                     {
                         BOOST_LOG_SEV(log, trace) << "Receive number of service ids: " << service_id_list.size() <<
                         " .But only found " << tac.adapter_config.serviceId_to_endpoint_map.size() << " in configuration files";
                         if (tac.adapter_config.mode != proxy_mode::SOURCE)
                         {
                             throw std::runtime_error("Not enough the service ids are found in the configuration files. Fail to start.");
                         }

                         BOOST_LOG_SEV(log, trace) << "Not all the service ids are found in the configuration files. Local proxy will help to pick up " << unfound_service_ids.size() << " ports.";
                         // initialize the port to be 0 in the service id <-> endpoint mapping, so that local proxy will help picking available ports when establish tcp connection with client's APP
                         for (auto service_id :unfound_service_ids)
                         {
                             tac.adapter_config.serviceId_to_endpoint_map[service_id] = "0";
                         }
                         tac.adapter_config.on_listen_port_assigned = std::bind(&tcp_adapter_proxy::handle_listen_port_assigned, this, std::placeholders::_1, std::placeholders::_2, std::ref(tac));
                     }
                 }
                 // If configuration files not provided, initialize the port to be 0 if in source mode.
                 else
                 {
                     if (tac.adapter_config.mode != proxy_mode::SOURCE)
                     {
                         throw std::runtime_error("No port mapping exists. Fail to start local proxy in destination mode.");
                     }
                     for (auto service_id:service_id_list)
                     {
                         tac.adapter_config.serviceId_to_endpoint_map[service_id] = "0";
                     }
                     tac.adapter_config.on_listen_port_assigned = std::bind(&tcp_adapter_proxy::handle_listen_port_assigned, this, std::placeholders::_1, std::placeholders::_2, std::ref(tac));
                 }

                 // Update in-memory mapping
                 BOOST_LOG_SEV(log, info) << "Use port mapping:";
                 BOOST_LOG_SEV(log, info) << "---------------------------------";
                 for (auto m: tac.adapter_config.serviceId_to_endpoint_map)
                 {
                     BOOST_LOG_SEV(log, info) << m.first << " = " << m.second;
                 }
                 BOOST_LOG_SEV(log, info) << "---------------------------------";
             }
             else if (tcp_adapter_proxy::fall_back_to_v1_message_format(tac.adapter_config.serviceId_to_endpoint_map) && service_id_list.size() == 1)
             {
                 // v1 format service id is an empty string in the map
                 std::string endpoint = tac.adapter_config.serviceId_to_endpoint_map[""];
                 std::string service_id = *service_id_list.begin();

                 // Remove empty string map and put new mapping
                 tac.adapter_config.serviceId_to_endpoint_map.erase("");
                 tac.adapter_config.serviceId_to_endpoint_map[service_id] = endpoint;
                 BOOST_LOG_SEV(log, info) << "Updated port mapping for v1 format: ";
                 for (auto m : tac.adapter_config.serviceId_to_endpoint_map)
                 {
                     BOOST_LOG_SEV(log, info) << m.first << " = " << m.second;
                 }
             }
             if (after_get_service_ids)
             {
                 after_get_service_ids();
             }
             return true;
        }