void ClientConnection::handleTcpConnected()

in lib/ClientConnection.cc [397:515]


void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
    if (!err) {
        std::stringstream cnxStringStream;
        try {
            cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
                            << "] ";
            cnxString_ = cnxStringStream.str();
        } catch (const ASIO_SYSTEM_ERROR& e) {
            LOG_ERROR("Failed to get endpoint: " << e.what());
            close(ResultRetryable);
            return;
        }
        if (logicalAddress_ == physicalAddress_) {
            LOG_INFO(cnxString_ << "Connected to broker");
        } else {
            LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
                                << ", proxy: " << proxyServiceUrl_
                                << ", physical address:" << physicalAddress_);
        }

        Lock lock(mutex_);
        if (isClosed()) {
            LOG_INFO(cnxString_ << "Connection already closed");
            return;
        }
        state_ = TcpConnected;
        lock.unlock();

        ASIO_ERROR error;
        socket_->set_option(tcp::no_delay(true), error);
        if (error) {
            LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
        }

        socket_->set_option(tcp::socket::keep_alive(true), error);
        if (error) {
            LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
        }

        // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
        // should never happen, given that we're sending our own keep-alive probes (within the TCP
        // connection) every 30 seconds
        socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
        if (error) {
            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
        }

        // Send up to 10 probes before declaring the connection broken
        socket_->set_option(tcp_keep_alive_count(10), error);
        if (error) {
            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
        }

        // Interval between probes: 6 seconds
        socket_->set_option(tcp_keep_alive_interval(6), error);
        if (error) {
            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
        }

        if (tlsSocket_) {
            if (!isTlsAllowInsecureConnection_) {
                ASIO_ERROR err;
                Url service_url;
                if (!Url::parse(physicalAddress_, service_url)) {
                    LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
                    close();
                    return;
                }
            }
            auto weakSelf = weak_from_this();
            auto socket = socket_;
            auto tlsSocket = tlsSocket_;
            // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
            // fault might happen
            auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
                auto self = weakSelf.lock();
                if (self) {
                    self->handleHandshake(err);
                }
            };
            tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
                                        ASIO::bind_executor(strand_, callback));
        } else {
            handleHandshake(ASIO_SUCCESS);
        }
    } else if (endpointIterator != tcp::resolver::iterator()) {
        LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
        // The connection failed. Try the next endpoint in the list.
        ASIO_ERROR closeError;
        socket_->close(closeError);  // ignore the error of close
        if (closeError) {
            LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
        }
        connectTimeoutTask_->stop();
        ++endpointIterator;
        if (endpointIterator != tcp::resolver::iterator()) {
            LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
            connectTimeoutTask_->start();
            tcp::endpoint endpoint = *endpointIterator;
            auto weakSelf = weak_from_this();
            socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
                auto self = weakSelf.lock();
                if (self) {
                    self->handleTcpConnected(err, endpointIterator);
                }
            });
        } else {
            if (err == ASIO::error::operation_aborted) {
                // TCP connect timeout, which is not retryable
                close();
            } else {
                close(ResultRetryable);
            }
        }
    } else {
        LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
        close(ResultRetryable);
    }
}