in src/TcpAdapterProxy.cpp [1570:1661]
void tcp_adapter_proxy::async_setup_source_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, string service_id)
{
tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id];
tcp_socket_ensure_closed(server->connection_->socket());
server->acceptor_.close();
static boost::asio::socket_base::reuse_address reuse_addr_option(true);
tac.bind_address_actual = tac.adapter_config.bind_address.get_value_or(GET_SETTING(settings, DEFAULT_BIND_ADDRESS));
BOOST_LOG_SEV(log, debug) << "Resolving bind address host: " << tac.bind_address_actual;
std::string endpoint = tac.adapter_config.serviceId_to_endpoint_map[service_id];
tuple<string, string> endpoint_to_connect = get_host_and_port(endpoint, tac.bind_address_actual);
std::string src_port = std::get<1>(endpoint_to_connect);
std::uint16_t port_to_connect = boost::lexical_cast<std::uint16_t>(src_port);
BOOST_LOG_SEV(log, debug) << "Port to connect " << port_to_connect;
server->resolver_.async_resolve(tac.bind_address_actual, src_port,
boost::asio::ip::resolver_base::passive,
[=, &tac](boost::system::error_code const &ec, tcp::resolver::results_type results)
{
if (ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Could not resolve bind address: %1% -- %2%") % tac.bind_address_actual % ec.message()).str();
basic_retry_execute(log, retry_config,
[&tac, &ec]() { throw proxy_exception((boost::format("Failed to resolve bind address for: %1%") % tac.bind_address_actual).str(), ec); });
}
else
{
BOOST_LOG_SEV(log, debug) << "Resolved bind IP: " << results->endpoint().address().to_string();
boost::system::error_code bind_ec;
server->acceptor_.open(results->endpoint().protocol());
if (port_to_connect)
{ //if data port is 0 (means pick an empheral port), then don't set this option
server->acceptor_.set_option(reuse_addr_option);
}
server->acceptor_.bind(results->endpoint(), bind_ec);
if (bind_ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Could not bind to address: %1%:%2% -- %3%") % results->endpoint().address().to_string() % results->endpoint().port() % bind_ec.message()).str();
basic_retry_execute(log, retry_config,
[]() { throw proxy_exception(SOURCE_LOCAL_PROXY_PORT_BIND_EXCEPTION); });
}
else
{
std::uint16_t local_port = static_cast<std::uint16_t>(server->acceptor_.local_endpoint().port());
BOOST_LOG_SEV(log, info) << "Listening for new connection on port " << local_port;
boost::system::error_code listen_ec;
server->acceptor_.listen(0, listen_ec);
if (listen_ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Could not listen on bind address: %1%:%2% -- %3%")
% results->endpoint().address().to_string() % local_port % listen_ec.message()).str();
basic_retry_execute(log, retry_config,
[]() { throw proxy_exception(SOURCE_LOCAL_PROXY_PORT_BIND_EXCEPTION); });
}
else
{
if (port_to_connect == 0 && tac.adapter_config.on_listen_port_assigned)
{
tac.adapter_config.on_listen_port_assigned(local_port, service_id);
}
server->acceptor_.async_accept(
[=, &tac](boost::system::error_code const &ec, boost::asio::ip::tcp::socket new_socket)
{
if (ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Could not listen/accept incoming connection on %1%:%2% -- %3%")
% tac.bind_address_actual % local_port % ec.message()).str();
basic_retry_execute(log, retry_config,
[=, &ec]() { throw std::runtime_error((boost::format("Failed to accept new connection on %1% -- %2%") % local_port % ec.message()).str()); });
}
else
{
BOOST_LOG_SEV(log, debug) << "socket port " << new_socket.local_endpoint().port();
string endpoint = boost::lexical_cast<std::string>(new_socket.local_endpoint().port());
BOOST_LOG_SEV(log, debug) << "endpoint mapping:";
for (auto m: tac.adapter_config.serviceId_to_endpoint_map)
{
BOOST_LOG_SEV(log, debug) << m.first << " = " << m.second;
}
tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id];
server->connection_->socket() = std::move(new_socket);
BOOST_LOG_SEV(log, info) << "Accepted tcp connection on port " << server->connection_->socket().local_endpoint().port() << " from " << server->connection_->socket().remote_endpoint();
invoke_and_clear_handler(server->after_setup_tcp_socket);
}
});
}
}
}
});
}