void ClientConnection::s_onConnectionSetup()

in eventstream_rpc/source/EventStreamClient.cpp [630:694]


        void ClientConnection::s_onConnectionSetup(
            struct aws_event_stream_rpc_client_connection *connection,
            int errorCode,
            void *userData) noexcept
        {
            /* The `userData` pointer is used to pass `this` of a `ClientConnection` object. */
            auto *thisConnection = static_cast<ClientConnection *>(userData);

            const std::lock_guard<std::recursive_mutex> lock(thisConnection->m_stateMutex);

            if (errorCode)
            {
                thisConnection->m_clientState = DISCONNECTED;
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_CLIENT,
                    "A CRT error occurred while setting up the connection: %s",
                    Crt::ErrorDebugString(errorCode));
                thisConnection->m_connectAckedPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
                aws_event_stream_rpc_client_connection_release(connection);
                thisConnection->m_underlyingConnection = nullptr;
                /* No connection to close on error, so no need to check return value of the callback. */
                (void)thisConnection->m_lifecycleHandler->OnErrorCallback({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
            }
            else if (thisConnection->m_clientState == DISCONNECTING || thisConnection->m_clientState == DISCONNECTED)
            {
                thisConnection->m_underlyingConnection = connection;
                thisConnection->m_closeReason = {EVENT_STREAM_RPC_CONNECTION_CLOSED, 0};
                thisConnection->Close();
            }
            else
            {
                thisConnection->m_clientState = WAITING_FOR_CONNECT_ACK;
                thisConnection->m_underlyingConnection = connection;
                MessageAmendment messageAmendment;
                Crt::List<EventStreamHeader> messageAmendmentHeaders = messageAmendment.GetHeaders();

                if (thisConnection->m_connectMessageAmender)
                {
                    MessageAmendment connectAmendment(thisConnection->m_connectMessageAmender());
                    Crt::List<EventStreamHeader> amenderHeaderList = connectAmendment.GetHeaders();
                    /* The version header is necessary for establishing the connection. */
                    messageAmendment.AddHeader(EventStreamHeader(
                        Crt::String(EVENTSTREAM_VERSION_HEADER),
                        Crt::String(EVENTSTREAM_VERSION_STRING),
                        thisConnection->m_allocator));
                    /* Note that we are prepending headers from the user-provided amender. */
                    if (amenderHeaderList.size() > 0)
                    {
                        messageAmendmentHeaders.splice(messageAmendmentHeaders.end(), amenderHeaderList);
                    }
                    messageAmendment.SetPayload(connectAmendment.GetPayload());
                }

                /* Send a CONNECT packet to the server. */
                s_sendProtocolMessage(
                    thisConnection,
                    messageAmendment.GetHeaders(),
                    messageAmendment.GetPayload(),
                    AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT,
                    0U,
                    thisConnection->m_onConnectRequestCallback);
            }

            thisConnection->m_connectionSetupPromise.set_value();
        }