in eventstream_rpc/source/EventStreamClient.cpp [1403:1515]
void ClientOperation::OnContinuationMessage(
const Crt::List<EventStreamHeader> &headers,
const Crt::Optional<Crt::ByteBuf> &payload,
MessageType messageType,
uint32_t messageFlags)
{
EventStreamRpcStatusCode errorCode = EVENT_STREAM_RPC_SUCCESS;
const EventStreamHeader *modelHeader = nullptr;
const EventStreamHeader *contentHeader = nullptr;
Crt::String modelName;
if (messageFlags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM)
{
const std::lock_guard<std::mutex> lock(m_continuationMutex);
m_expectedCloses.fetch_add(1);
}
m_messageCount += 1;
modelHeader = GetHeaderByName(headers, Crt::String(SERVICE_MODEL_TYPE_HEADER));
if (modelHeader == nullptr)
{
/* Missing required service model type header. */
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"A required header (%s) could not be found in the message.",
SERVICE_MODEL_TYPE_HEADER);
errorCode = EVENT_STREAM_RPC_UNMAPPED_DATA;
}
/* Verify that the model name matches. */
if (!errorCode)
{
modelHeader->GetValueAsString(modelName);
if (messageType == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE)
{
if (m_messageCount == 1 && m_operationModelContext.GetInitialResponseModelName() != modelName)
{
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"The model name of the initial response did not match its expected model name.");
errorCode = EVENT_STREAM_RPC_UNMAPPED_DATA;
}
else if (
m_messageCount > 1 && m_operationModelContext.GetStreamingResponseModelName().has_value() &&
m_operationModelContext.GetStreamingResponseModelName().value() != modelName)
{
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"The model name of a subsequent response did not match its expected model name.");
errorCode = EVENT_STREAM_RPC_UNMAPPED_DATA;
}
}
}
if (!errorCode)
{
Crt::String contentType;
contentHeader = GetHeaderByName(headers, Crt::String(CONTENT_TYPE_HEADER));
if (contentHeader == nullptr)
{
/* Missing required content type header. */
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"A required header (%s) could not be found in the message.",
CONTENT_TYPE_HEADER);
errorCode = EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE;
}
else if (contentHeader->GetValueAsString(contentType) && contentType != CONTENT_TYPE_APPLICATION_JSON)
{
/* Missing required content type header. */
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"The content type (%s) header was specified with an unsupported value (%s).",
CONTENT_TYPE_HEADER,
contentType.c_str());
errorCode = EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE;
}
}
if (!errorCode)
{
if (messageType == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE)
{
errorCode = HandleData(payload);
}
else
{
errorCode = HandleError(modelName, payload, messageFlags);
}
}
if (errorCode)
{
if (m_messageCount == 1)
{
const std::lock_guard<std::mutex> lock(m_continuationMutex);
m_resultReceived = true;
RpcError promiseValue = {(EventStreamRpcStatusCode)errorCode, 0};
m_initialResponsePromise.set_value(TaggedResult(promiseValue));
}
else
{
bool shouldClose = true;
if (m_streamHandler)
shouldClose = m_streamHandler->OnStreamError(nullptr, {errorCode, 0});
if (!m_clientContinuation.IsClosed() && shouldClose)
{
Close().wait();
}
}
}
}