std::future ClientConnection::Connect()

in eventstream_rpc/source/EventStreamClient.cpp [316:422]


        std::future<RpcError> ClientConnection::Connect(
            const ConnectionConfig &connectionConfig,
            ConnectionLifecycleHandler *connectionLifecycleHandler,
            Crt::Io::ClientBootstrap &clientBootstrap) noexcept
        {
            EventStreamRpcStatusCode baseError = EVENT_STREAM_RPC_SUCCESS;
            struct aws_event_stream_rpc_client_connection_options connOptions;

            {
                const std::lock_guard<std::recursive_mutex> lock(m_stateMutex);
                if (m_clientState == DISCONNECTED)
                {
                    m_clientState = CONNECTING_SOCKET;
                    m_onConnectCalled = false;
                    m_connectionSetupPromise = {};
                    m_connectAckedPromise = {};
                    m_closedPromise = {};
                    m_closeReason = {EVENT_STREAM_RPC_UNINITIALIZED, 0};
                    m_connectionConfig = connectionConfig;
                    m_lifecycleHandler = connectionLifecycleHandler;
                }
                else
                {
                    baseError = EVENT_STREAM_RPC_CONNECTION_ALREADY_ESTABLISHED;
                }
            }

            m_onConnectRequestCallback = m_connectionConfig.GetConnectRequestCallback();
            Crt::String hostName;

            if (baseError == EVENT_STREAM_RPC_SUCCESS)
            {
                AWS_ZERO_STRUCT(connOptions);
                if (m_connectionConfig.GetHostName().has_value())
                {
                    hostName = m_connectionConfig.GetHostName().value();
                    connOptions.host_name = hostName.c_str();
                }
                else
                {
                    baseError = EVENT_STREAM_RPC_NULL_PARAMETER;
                }
                if (m_connectionConfig.GetPort().has_value())
                {
                    connOptions.port = m_connectionConfig.GetPort().value();
                }
                else
                {
                    baseError = EVENT_STREAM_RPC_NULL_PARAMETER;
                }

                connOptions.bootstrap = clientBootstrap.GetUnderlyingHandle();
            }

            if (baseError)
            {
                std::promise<RpcError> errorPromise;
                errorPromise.set_value({baseError, 0});
                if (baseError == EVENT_STREAM_RPC_NULL_PARAMETER)
                {
                    const std::lock_guard<std::recursive_mutex> lock(m_stateMutex);
                    m_clientState = DISCONNECTED;
                }
                return errorPromise.get_future();
            }

            if (m_connectionConfig.GetSocketOptions().has_value())
            {
                m_socketOptions = m_connectionConfig.GetSocketOptions().value();
            }
            connOptions.socket_options = &m_socketOptions.GetImpl();

            connOptions.on_connection_setup = ClientConnection::s_onConnectionSetup;
            connOptions.on_connection_protocol_message = ClientConnection::s_onProtocolMessage;
            connOptions.on_connection_shutdown = ClientConnection::s_onConnectionShutdown;
            connOptions.user_data = reinterpret_cast<void *>(this);

            m_lifecycleHandler = connectionLifecycleHandler;
            m_connectMessageAmender = m_connectionConfig.GetConnectMessageAmender();

            if (m_connectionConfig.GetTlsConnectionOptions().has_value())
            {
                connOptions.tls_options = m_connectionConfig.GetTlsConnectionOptions()->GetUnderlyingHandle();
            }

            int crtError = aws_event_stream_rpc_client_connection_connect(m_allocator, &connOptions);

            if (crtError)
            {
                std::promise<RpcError> errorPromise;
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_CLIENT,
                    "A CRT error occurred while attempting to establish the connection: %s",
                    Crt::ErrorDebugString(crtError));
                errorPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, crtError});
                const std::lock_guard<std::recursive_mutex> lock(m_stateMutex);
                m_clientState = DISCONNECTED;
                return errorPromise.get_future();
            }
            else
            {
                const std::lock_guard<std::recursive_mutex> lock(m_stateMutex);
                m_connectionWillSetup = true;
            }

            return m_connectAckedPromise.get_future();
        }