in eventstream_rpc/source/EventStreamClient.cpp [1335:1388]
EventStreamRpcStatusCode ClientOperation::HandleError(
const Crt::String &modelName,
const Crt::Optional<Crt::ByteBuf> &payload,
uint32_t messageFlags)
{
bool streamAlreadyTerminated = (messageFlags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) != 0;
Crt::StringView payloadStringView;
if (payload.has_value())
{
payloadStringView = Crt::ByteCursorToStringView(Crt::ByteCursorFromByteBuf(payload.value()));
}
/* The value of this hashmap contains the function that allocates the error from the
* payload. */
Crt::ScopedResource<OperationError> error =
m_operationModelContext.AllocateOperationErrorFromPayload(modelName, payloadStringView, m_allocator);
if (error.get() == nullptr)
return EVENT_STREAM_RPC_UNMAPPED_DATA;
if (error->GetMessage().has_value())
{
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"An error was received from the server: %s",
error->GetMessage().value().c_str());
}
TaggedResult taggedResult(std::move(error));
if (m_messageCount == 1)
{
{
const std::lock_guard<std::mutex> lock(m_continuationMutex);
m_resultReceived = true;
m_initialResponsePromise.set_value(std::move(taggedResult));
}
/* Close the stream unless the server already closed it for us. This condition is checked
* so that TERMINATE_STREAM messages aren't resent by the client. */
if (!streamAlreadyTerminated && !m_clientContinuation.IsClosed())
{
Close().wait();
}
}
else
{
bool shouldCloseNow = true;
if (m_streamHandler)
shouldCloseNow = m_streamHandler->OnStreamError(std::move(error), {EVENT_STREAM_RPC_SUCCESS, 0});
if (!streamAlreadyTerminated && shouldCloseNow && !m_clientContinuation.IsClosed())
{
Close().wait();
}
}
return EVENT_STREAM_RPC_SUCCESS;
}