void ClientConnection::handleTcpConnected()

in lib/ClientConnection.cc [368:478]


void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
                                          tcp::resolver::iterator endpointIterator) {
    if (!err) {
        std::stringstream cnxStringStream;
        try {
            cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
                            << "] ";
            cnxString_ = cnxStringStream.str();
        } catch (const boost::system::system_error& e) {
            LOG_ERROR("Failed to get endpoint: " << e.what());
            close(ResultRetryable);
            return;
        }
        if (logicalAddress_ == physicalAddress_) {
            LOG_INFO(cnxString_ << "Connected to broker");
        } else {
            LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
        }

        Lock lock(mutex_);
        if (isClosed()) {
            LOG_INFO(cnxString_ << "Connection already closed");
            return;
        }
        state_ = TcpConnected;
        lock.unlock();

        boost::system::error_code error;
        socket_->set_option(tcp::no_delay(true), error);
        if (error) {
            LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
        }

        socket_->set_option(tcp::socket::keep_alive(true), error);
        if (error) {
            LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
        }

        // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
        // should never happen, given that we're sending our own keep-alive probes (within the TCP
        // connection) every 30 seconds
        socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
        if (error) {
            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
        }

        // Send up to 10 probes before declaring the connection broken
        socket_->set_option(tcp_keep_alive_count(10), error);
        if (error) {
            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
        }

        // Interval between probes: 6 seconds
        socket_->set_option(tcp_keep_alive_interval(6), error);
        if (error) {
            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
        }

        if (tlsSocket_) {
            if (!isTlsAllowInsecureConnection_) {
                boost::system::error_code err;
                Url service_url;
                if (!Url::parse(physicalAddress_, service_url)) {
                    LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
                    close();
                    return;
                }
            }
#if BOOST_VERSION >= 106600
            tlsSocket_->async_handshake(
                boost::asio::ssl::stream<tcp::socket>::client,
                boost::asio::bind_executor(strand_, std::bind(&ClientConnection::handleHandshake,
                                                              shared_from_this(), std::placeholders::_1)));
#else
            tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
                                        strand_.wrap(std::bind(&ClientConnection::handleHandshake,
                                                               shared_from_this(), std::placeholders::_1)));
#endif
        } else {
            handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
        }
    } else if (endpointIterator != tcp::resolver::iterator()) {
        LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
        // The connection failed. Try the next endpoint in the list.
        boost::system::error_code closeError;
        socket_->close(closeError);  // ignore the error of close
        if (closeError) {
            LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
        }
        connectTimeoutTask_->stop();
        ++endpointIterator;
        if (endpointIterator != tcp::resolver::iterator()) {
            LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
            connectTimeoutTask_->start();
            tcp::endpoint endpoint = *endpointIterator;
            socket_->async_connect(endpoint,
                                   std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
                                             std::placeholders::_1, ++endpointIterator));
        } else {
            if (err == boost::asio::error::operation_aborted) {
                // TCP connect timeout, which is not retryable
                close();
            } else {
                close(ResultRetryable);
            }
        }
    } else {
        LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
        close(ResultRetryable);
    }
}