in lib/ClientConnection.cc [368:478]
void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
tcp::resolver::iterator endpointIterator) {
if (!err) {
std::stringstream cnxStringStream;
try {
cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
<< "] ";
cnxString_ = cnxStringStream.str();
} catch (const boost::system::system_error& e) {
LOG_ERROR("Failed to get endpoint: " << e.what());
close(ResultRetryable);
return;
}
if (logicalAddress_ == physicalAddress_) {
LOG_INFO(cnxString_ << "Connected to broker");
} else {
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
}
Lock lock(mutex_);
if (isClosed()) {
LOG_INFO(cnxString_ << "Connection already closed");
return;
}
state_ = TcpConnected;
lock.unlock();
boost::system::error_code error;
socket_->set_option(tcp::no_delay(true), error);
if (error) {
LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
}
socket_->set_option(tcp::socket::keep_alive(true), error);
if (error) {
LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
}
// Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
// should never happen, given that we're sending our own keep-alive probes (within the TCP
// connection) every 30 seconds
socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
}
// Send up to 10 probes before declaring the connection broken
socket_->set_option(tcp_keep_alive_count(10), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
}
// Interval between probes: 6 seconds
socket_->set_option(tcp_keep_alive_interval(6), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
}
if (tlsSocket_) {
if (!isTlsAllowInsecureConnection_) {
boost::system::error_code err;
Url service_url;
if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
}
}
#if BOOST_VERSION >= 106600
tlsSocket_->async_handshake(
boost::asio::ssl::stream<tcp::socket>::client,
boost::asio::bind_executor(strand_, std::bind(&ClientConnection::handleHandshake,
shared_from_this(), std::placeholders::_1)));
#else
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
strand_.wrap(std::bind(&ClientConnection::handleHandshake,
shared_from_this(), std::placeholders::_1)));
#endif
} else {
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
}
} else if (endpointIterator != tcp::resolver::iterator()) {
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
// The connection failed. Try the next endpoint in the list.
boost::system::error_code closeError;
socket_->close(closeError); // ignore the error of close
if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
++endpointIterator;
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
socket_->async_connect(endpoint,
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
} else {
if (err == boost::asio::error::operation_aborted) {
// TCP connect timeout, which is not retryable
close();
} else {
close(ResultRetryable);
}
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close(ResultRetryable);
}
}