void tcp_adapter_proxy::async_setup_source_tcp_socket_retry()

in src/TcpAdapterProxy.cpp [1570:1661]


    void tcp_adapter_proxy::async_setup_source_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, string service_id)
    {
        tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id];
        tcp_socket_ensure_closed(server->connection_->socket());
        server->acceptor_.close();

        static boost::asio::socket_base::reuse_address reuse_addr_option(true);

        tac.bind_address_actual = tac.adapter_config.bind_address.get_value_or(GET_SETTING(settings, DEFAULT_BIND_ADDRESS));
        BOOST_LOG_SEV(log, debug) << "Resolving bind address host: " << tac.bind_address_actual;

        std::string endpoint =  tac.adapter_config.serviceId_to_endpoint_map[service_id];
        tuple<string, string> endpoint_to_connect = get_host_and_port(endpoint, tac.bind_address_actual);
        std::string src_port = std::get<1>(endpoint_to_connect);
        std::uint16_t port_to_connect = boost::lexical_cast<std::uint16_t>(src_port);
        BOOST_LOG_SEV(log, debug) << "Port to connect " << port_to_connect;
        server->resolver_.async_resolve(tac.bind_address_actual, src_port,
            boost::asio::ip::resolver_base::passive, 
            [=, &tac](boost::system::error_code const &ec, tcp::resolver::results_type results)
            {
                if (ec)
                {
                    BOOST_LOG_SEV(log, error) << (boost::format("Could not resolve bind address: %1% -- %2%") % tac.bind_address_actual % ec.message()).str();
                    basic_retry_execute(log, retry_config,
                        [&tac, &ec]() { throw proxy_exception((boost::format("Failed to resolve bind address for: %1%") % tac.bind_address_actual).str(), ec); });
                }
                else
                {
                    BOOST_LOG_SEV(log, debug) << "Resolved bind IP: " << results->endpoint().address().to_string();
                    boost::system::error_code bind_ec;
                    server->acceptor_.open(results->endpoint().protocol());
                    if (port_to_connect)
                    {   //if data port is 0 (means pick an empheral port), then don't set this option
                        server->acceptor_.set_option(reuse_addr_option);
                    }
                    server->acceptor_.bind(results->endpoint(), bind_ec);
                    if (bind_ec)
                    {
                        BOOST_LOG_SEV(log, error) << (boost::format("Could not bind to address: %1%:%2% -- %3%") % results->endpoint().address().to_string() % results->endpoint().port() % bind_ec.message()).str();
                        basic_retry_execute(log, retry_config,
                            []() { throw proxy_exception(SOURCE_LOCAL_PROXY_PORT_BIND_EXCEPTION); });
                    }
                    else
                    {
                        std::uint16_t local_port = static_cast<std::uint16_t>(server->acceptor_.local_endpoint().port());
                        BOOST_LOG_SEV(log, info) << "Listening for new connection on port " << local_port;
                        boost::system::error_code listen_ec;
                        server->acceptor_.listen(0, listen_ec);
                        if (listen_ec)
                        {
                            BOOST_LOG_SEV(log, error) << (boost::format("Could not listen on bind address: %1%:%2% -- %3%")
                                % results->endpoint().address().to_string() % local_port % listen_ec.message()).str();
                            basic_retry_execute(log, retry_config,
                                []() { throw proxy_exception(SOURCE_LOCAL_PROXY_PORT_BIND_EXCEPTION); });
                        }
                        else
                        {
                            if (port_to_connect == 0 && tac.adapter_config.on_listen_port_assigned)
                            {
                                tac.adapter_config.on_listen_port_assigned(local_port, service_id);
                            }
                            server->acceptor_.async_accept(
                                    [=, &tac](boost::system::error_code const &ec, boost::asio::ip::tcp::socket new_socket)
                            {

                                if (ec)
                                {
                                    BOOST_LOG_SEV(log, error) << (boost::format("Could not listen/accept incoming connection on %1%:%2% -- %3%")
                                        % tac.bind_address_actual % local_port % ec.message()).str();
                                    basic_retry_execute(log, retry_config,
                                        [=, &ec]() { throw std::runtime_error((boost::format("Failed to accept new connection on %1% -- %2%") % local_port % ec.message()).str()); });
                                }
                                else
                                {
                                    BOOST_LOG_SEV(log, debug) << "socket port " << new_socket.local_endpoint().port();
                                    string endpoint = boost::lexical_cast<std::string>(new_socket.local_endpoint().port());
                                    BOOST_LOG_SEV(log, debug) << "endpoint mapping:";
                                    for (auto m: tac.adapter_config.serviceId_to_endpoint_map)
                                    {
                                        BOOST_LOG_SEV(log, debug) << m.first << " = " << m.second;
                                    }
                                    tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id];
                                    server->connection_->socket() = std::move(new_socket);
                                    BOOST_LOG_SEV(log, info) << "Accepted tcp connection on port " << server->connection_->socket().local_endpoint().port() << " from " << server->connection_->socket().remote_endpoint();
                                    invoke_and_clear_handler(server->after_setup_tcp_socket);
                                }
                            });
                        }
                    }
                }
            });
    }