void ClientOperation::OnContinuationMessage()

in eventstream_rpc/source/EventStreamClient.cpp [1403:1515]


        void ClientOperation::OnContinuationMessage(
            const Crt::List<EventStreamHeader> &headers,
            const Crt::Optional<Crt::ByteBuf> &payload,
            MessageType messageType,
            uint32_t messageFlags)
        {
            EventStreamRpcStatusCode errorCode = EVENT_STREAM_RPC_SUCCESS;
            const EventStreamHeader *modelHeader = nullptr;
            const EventStreamHeader *contentHeader = nullptr;
            Crt::String modelName;

            if (messageFlags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM)
            {
                const std::lock_guard<std::mutex> lock(m_continuationMutex);
                m_expectedCloses.fetch_add(1);
            }

            m_messageCount += 1;

            modelHeader = GetHeaderByName(headers, Crt::String(SERVICE_MODEL_TYPE_HEADER));
            if (modelHeader == nullptr)
            {
                /* Missing required service model type header. */
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_CLIENT,
                    "A required header (%s) could not be found in the message.",
                    SERVICE_MODEL_TYPE_HEADER);
                errorCode = EVENT_STREAM_RPC_UNMAPPED_DATA;
            }

            /* Verify that the model name matches. */
            if (!errorCode)
            {
                modelHeader->GetValueAsString(modelName);
                if (messageType == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE)
                {
                    if (m_messageCount == 1 && m_operationModelContext.GetInitialResponseModelName() != modelName)
                    {
                        AWS_LOGF_ERROR(
                            AWS_LS_EVENT_STREAM_RPC_CLIENT,
                            "The model name of the initial response did not match its expected model name.");
                        errorCode = EVENT_STREAM_RPC_UNMAPPED_DATA;
                    }
                    else if (
                        m_messageCount > 1 && m_operationModelContext.GetStreamingResponseModelName().has_value() &&
                        m_operationModelContext.GetStreamingResponseModelName().value() != modelName)
                    {
                        AWS_LOGF_ERROR(
                            AWS_LS_EVENT_STREAM_RPC_CLIENT,
                            "The model name of a subsequent response did not match its expected model name.");
                        errorCode = EVENT_STREAM_RPC_UNMAPPED_DATA;
                    }
                }
            }

            if (!errorCode)
            {
                Crt::String contentType;
                contentHeader = GetHeaderByName(headers, Crt::String(CONTENT_TYPE_HEADER));
                if (contentHeader == nullptr)
                {
                    /* Missing required content type header. */
                    AWS_LOGF_ERROR(
                        AWS_LS_EVENT_STREAM_RPC_CLIENT,
                        "A required header (%s) could not be found in the message.",
                        CONTENT_TYPE_HEADER);
                    errorCode = EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE;
                }
                else if (contentHeader->GetValueAsString(contentType) && contentType != CONTENT_TYPE_APPLICATION_JSON)
                {
                    /* Missing required content type header. */
                    AWS_LOGF_ERROR(
                        AWS_LS_EVENT_STREAM_RPC_CLIENT,
                        "The content type (%s) header was specified with an unsupported value (%s).",
                        CONTENT_TYPE_HEADER,
                        contentType.c_str());
                    errorCode = EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE;
                }
            }

            if (!errorCode)
            {
                if (messageType == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE)
                {
                    errorCode = HandleData(payload);
                }
                else
                {
                    errorCode = HandleError(modelName, payload, messageFlags);
                }
            }

            if (errorCode)
            {
                if (m_messageCount == 1)
                {
                    const std::lock_guard<std::mutex> lock(m_continuationMutex);
                    m_resultReceived = true;
                    RpcError promiseValue = {(EventStreamRpcStatusCode)errorCode, 0};
                    m_initialResponsePromise.set_value(TaggedResult(promiseValue));
                }
                else
                {
                    bool shouldClose = true;
                    if (m_streamHandler)
                        shouldClose = m_streamHandler->OnStreamError(nullptr, {errorCode, 0});
                    if (!m_clientContinuation.IsClosed() && shouldClose)
                    {
                        Close().wait();
                    }
                }
            }
        }