EventStreamRpcStatusCode ClientOperation::HandleError()

in eventstream_rpc/source/EventStreamClient.cpp [1335:1388]


        EventStreamRpcStatusCode ClientOperation::HandleError(
            const Crt::String &modelName,
            const Crt::Optional<Crt::ByteBuf> &payload,
            uint32_t messageFlags)
        {
            bool streamAlreadyTerminated = (messageFlags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) != 0;

            Crt::StringView payloadStringView;
            if (payload.has_value())
            {
                payloadStringView = Crt::ByteCursorToStringView(Crt::ByteCursorFromByteBuf(payload.value()));
            }

            /* The value of this hashmap contains the function that allocates the error from the
             * payload. */
            Crt::ScopedResource<OperationError> error =
                m_operationModelContext.AllocateOperationErrorFromPayload(modelName, payloadStringView, m_allocator);
            if (error.get() == nullptr)
                return EVENT_STREAM_RPC_UNMAPPED_DATA;
            if (error->GetMessage().has_value())
            {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_CLIENT,
                    "An error was received from the server: %s",
                    error->GetMessage().value().c_str());
            }
            TaggedResult taggedResult(std::move(error));
            if (m_messageCount == 1)
            {
                {
                    const std::lock_guard<std::mutex> lock(m_continuationMutex);
                    m_resultReceived = true;
                    m_initialResponsePromise.set_value(std::move(taggedResult));
                }
                /* Close the stream unless the server already closed it for us. This condition is checked
                 * so that TERMINATE_STREAM messages aren't resent by the client. */
                if (!streamAlreadyTerminated && !m_clientContinuation.IsClosed())
                {
                    Close().wait();
                }
            }
            else
            {
                bool shouldCloseNow = true;
                if (m_streamHandler)
                    shouldCloseNow = m_streamHandler->OnStreamError(std::move(error), {EVENT_STREAM_RPC_SUCCESS, 0});
                if (!streamAlreadyTerminated && shouldCloseNow && !m_clientContinuation.IsClosed())
                {
                    Close().wait();
                }
            }

            return EVENT_STREAM_RPC_SUCCESS;
        }