in eventstream_rpc/source/EventStreamClient.cpp [682:740]
void ClientConnection::s_onConnectionSetup(
struct aws_event_stream_rpc_client_connection *connection,
int errorCode,
void *userData) noexcept
{
/* The `userData` pointer is used to pass `this` of a `ClientConnection` object. */
auto *thisConnection = static_cast<ClientConnection *>(userData);
const std::lock_guard<std::recursive_mutex> lock(thisConnection->m_stateMutex);
if (errorCode)
{
thisConnection->m_clientState = DISCONNECTED;
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"A CRT error occurred while setting up the connection: %s",
Crt::ErrorDebugString(errorCode));
thisConnection->m_connectAckedPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
aws_event_stream_rpc_client_connection_release(connection);
thisConnection->m_underlyingConnection = nullptr;
/* No connection to close on error, so no need to check return value of the callback. */
(void)thisConnection->m_lifecycleHandler->OnErrorCallback({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
}
else if (thisConnection->m_clientState == DISCONNECTING || thisConnection->m_clientState == DISCONNECTED)
{
thisConnection->m_underlyingConnection = connection;
thisConnection->m_closeReason = {EVENT_STREAM_RPC_CONNECTION_CLOSED, 0};
thisConnection->Close();
}
else
{
thisConnection->m_clientState = WAITING_FOR_CONNECT_ACK;
thisConnection->m_underlyingConnection = connection;
MessageAmendment messageAmendment;
if (thisConnection->m_connectMessageAmender)
{
MessageAmendment connectAmendment(thisConnection->m_connectMessageAmender());
/* The version header is necessary for establishing the connection. */
messageAmendment.AddHeader(EventStreamHeader(
Crt::String(EVENTSTREAM_VERSION_HEADER),
Crt::String(EVENTSTREAM_VERSION_STRING),
thisConnection->m_allocator));
messageAmendment.PrependHeaders(std::move(connectAmendment).GetHeaders());
messageAmendment.SetPayload(std::move(connectAmendment).GetPayload());
}
/* Send a CONNECT packet to the server. */
s_sendProtocolMessage(
thisConnection,
messageAmendment.GetHeaders(),
messageAmendment.GetPayload(),
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT,
0U,
thisConnection->m_onConnectRequestCallback);
}
thisConnection->m_connectionSetupPromise.set_value();
}