in lib/ClientConnection.cc [397:515]
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
if (!err) {
std::stringstream cnxStringStream;
try {
cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
<< "] ";
cnxString_ = cnxStringStream.str();
} catch (const ASIO_SYSTEM_ERROR& e) {
LOG_ERROR("Failed to get endpoint: " << e.what());
close(ResultRetryable);
return;
}
if (logicalAddress_ == physicalAddress_) {
LOG_INFO(cnxString_ << "Connected to broker");
} else {
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
<< ", proxy: " << proxyServiceUrl_
<< ", physical address:" << physicalAddress_);
}
Lock lock(mutex_);
if (isClosed()) {
LOG_INFO(cnxString_ << "Connection already closed");
return;
}
state_ = TcpConnected;
lock.unlock();
ASIO_ERROR error;
socket_->set_option(tcp::no_delay(true), error);
if (error) {
LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
}
socket_->set_option(tcp::socket::keep_alive(true), error);
if (error) {
LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
}
// Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
// should never happen, given that we're sending our own keep-alive probes (within the TCP
// connection) every 30 seconds
socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
}
// Send up to 10 probes before declaring the connection broken
socket_->set_option(tcp_keep_alive_count(10), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
}
// Interval between probes: 6 seconds
socket_->set_option(tcp_keep_alive_interval(6), error);
if (error) {
LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
}
if (tlsSocket_) {
if (!isTlsAllowInsecureConnection_) {
ASIO_ERROR err;
Url service_url;
if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
}
}
auto weakSelf = weak_from_this();
auto socket = socket_;
auto tlsSocket = tlsSocket_;
// socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
// fault might happen
auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleHandshake(err);
}
};
tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
ASIO::bind_executor(strand_, callback));
} else {
handleHandshake(ASIO_SUCCESS);
}
} else if (endpointIterator != tcp::resolver::iterator()) {
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
// The connection failed. Try the next endpoint in the list.
ASIO_ERROR closeError;
socket_->close(closeError); // ignore the error of close
if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
++endpointIterator;
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
auto weakSelf = weak_from_this();
socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpointIterator);
}
});
} else {
if (err == ASIO::error::operation_aborted) {
// TCP connect timeout, which is not retryable
close();
} else {
close(ResultRetryable);
}
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close(ResultRetryable);
}
}