void ClientConnection::s_onConnectionSetup()

in eventstream_rpc/source/EventStreamClient.cpp [682:740]


        void ClientConnection::s_onConnectionSetup(
            struct aws_event_stream_rpc_client_connection *connection,
            int errorCode,
            void *userData) noexcept
        {
            /* The `userData` pointer is used to pass `this` of a `ClientConnection` object. */
            auto *thisConnection = static_cast<ClientConnection *>(userData);

            const std::lock_guard<std::recursive_mutex> lock(thisConnection->m_stateMutex);

            if (errorCode)
            {
                thisConnection->m_clientState = DISCONNECTED;
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_CLIENT,
                    "A CRT error occurred while setting up the connection: %s",
                    Crt::ErrorDebugString(errorCode));
                thisConnection->m_connectAckedPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
                aws_event_stream_rpc_client_connection_release(connection);
                thisConnection->m_underlyingConnection = nullptr;
                /* No connection to close on error, so no need to check return value of the callback. */
                (void)thisConnection->m_lifecycleHandler->OnErrorCallback({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
            }
            else if (thisConnection->m_clientState == DISCONNECTING || thisConnection->m_clientState == DISCONNECTED)
            {
                thisConnection->m_underlyingConnection = connection;
                thisConnection->m_closeReason = {EVENT_STREAM_RPC_CONNECTION_CLOSED, 0};
                thisConnection->Close();
            }
            else
            {
                thisConnection->m_clientState = WAITING_FOR_CONNECT_ACK;
                thisConnection->m_underlyingConnection = connection;
                MessageAmendment messageAmendment;

                if (thisConnection->m_connectMessageAmender)
                {
                    MessageAmendment connectAmendment(thisConnection->m_connectMessageAmender());
                    /* The version header is necessary for establishing the connection. */
                    messageAmendment.AddHeader(EventStreamHeader(
                        Crt::String(EVENTSTREAM_VERSION_HEADER),
                        Crt::String(EVENTSTREAM_VERSION_STRING),
                        thisConnection->m_allocator));
                    messageAmendment.PrependHeaders(std::move(connectAmendment).GetHeaders());
                    messageAmendment.SetPayload(std::move(connectAmendment).GetPayload());
                }

                /* Send a CONNECT packet to the server. */
                s_sendProtocolMessage(
                    thisConnection,
                    messageAmendment.GetHeaders(),
                    messageAmendment.GetPayload(),
                    AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT,
                    0U,
                    thisConnection->m_onConnectRequestCallback);
            }

            thisConnection->m_connectionSetupPromise.set_value();
        }