in eventstream_rpc/source/EventStreamClient.cpp [795:878]
void ClientConnection::s_onProtocolMessage(
struct aws_event_stream_rpc_client_connection *connection,
const struct aws_event_stream_rpc_message_args *messageArgs,
void *userData) noexcept
{
AWS_PRECONDITION(messageArgs != nullptr);
(void)connection;
/* The `userData` pointer is used to pass `this` of a `ClientConnection` object. */
auto *thisConnection = static_cast<ClientConnection *>(userData);
Crt::List<EventStreamHeader> pingHeaders;
switch (messageArgs->message_type)
{
case AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK:
thisConnection->m_stateMutex.lock();
if (thisConnection->m_clientState == WAITING_FOR_CONNECT_ACK)
{
if (messageArgs->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)
{
thisConnection->m_clientState = CONNECTED;
thisConnection->m_onConnectCalled = true;
thisConnection->m_connectAckedPromise.set_value({EVENT_STREAM_RPC_SUCCESS, 0});
thisConnection->m_lifecycleHandler->OnConnectCallback();
}
else
{
thisConnection->m_closeReason = {EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED, 0};
thisConnection->Close();
}
}
else
{
/* Unexpected CONNECT_ACK received. */
}
thisConnection->m_stateMutex.unlock();
break;
case AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING:
for (size_t i = 0; i < messageArgs->headers_count; ++i)
{
pingHeaders.emplace_back(
EventStreamHeader(messageArgs->headers[i], thisConnection->m_allocator));
}
if (messageArgs->payload)
{
thisConnection->m_lifecycleHandler->OnPingCallback(pingHeaders, *messageArgs->payload);
}
else
{
thisConnection->m_lifecycleHandler->OnPingCallback(pingHeaders, Crt::Optional<Crt::ByteBuf>());
}
break;
case AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING_RESPONSE:
return;
break;
case AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR:
case AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR:
if (thisConnection->m_lifecycleHandler->OnErrorCallback(
{EVENT_STREAM_RPC_CRT_ERROR, AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR}))
{
thisConnection->Close();
}
break;
default:
if (thisConnection->m_lifecycleHandler->OnErrorCallback(
{EVENT_STREAM_RPC_UNKNOWN_PROTOCOL_MESSAGE, 0}))
{
thisConnection->Close();
}
break;
}
}