in eventstream_rpc/source/EventStreamClient.cpp [316:422]
std::future<RpcError> ClientConnection::Connect(
const ConnectionConfig &connectionConfig,
ConnectionLifecycleHandler *connectionLifecycleHandler,
Crt::Io::ClientBootstrap &clientBootstrap) noexcept
{
EventStreamRpcStatusCode baseError = EVENT_STREAM_RPC_SUCCESS;
struct aws_event_stream_rpc_client_connection_options connOptions;
{
const std::lock_guard<std::recursive_mutex> lock(m_stateMutex);
if (m_clientState == DISCONNECTED)
{
m_clientState = CONNECTING_SOCKET;
m_onConnectCalled = false;
m_connectionSetupPromise = {};
m_connectAckedPromise = {};
m_closedPromise = {};
m_closeReason = {EVENT_STREAM_RPC_UNINITIALIZED, 0};
m_connectionConfig = connectionConfig;
m_lifecycleHandler = connectionLifecycleHandler;
}
else
{
baseError = EVENT_STREAM_RPC_CONNECTION_ALREADY_ESTABLISHED;
}
}
m_onConnectRequestCallback = m_connectionConfig.GetConnectRequestCallback();
Crt::String hostName;
if (baseError == EVENT_STREAM_RPC_SUCCESS)
{
AWS_ZERO_STRUCT(connOptions);
if (m_connectionConfig.GetHostName().has_value())
{
hostName = m_connectionConfig.GetHostName().value();
connOptions.host_name = hostName.c_str();
}
else
{
baseError = EVENT_STREAM_RPC_NULL_PARAMETER;
}
if (m_connectionConfig.GetPort().has_value())
{
connOptions.port = m_connectionConfig.GetPort().value();
}
else
{
baseError = EVENT_STREAM_RPC_NULL_PARAMETER;
}
connOptions.bootstrap = clientBootstrap.GetUnderlyingHandle();
}
if (baseError)
{
std::promise<RpcError> errorPromise;
errorPromise.set_value({baseError, 0});
if (baseError == EVENT_STREAM_RPC_NULL_PARAMETER)
{
const std::lock_guard<std::recursive_mutex> lock(m_stateMutex);
m_clientState = DISCONNECTED;
}
return errorPromise.get_future();
}
if (m_connectionConfig.GetSocketOptions().has_value())
{
m_socketOptions = m_connectionConfig.GetSocketOptions().value();
}
connOptions.socket_options = &m_socketOptions.GetImpl();
connOptions.on_connection_setup = ClientConnection::s_onConnectionSetup;
connOptions.on_connection_protocol_message = ClientConnection::s_onProtocolMessage;
connOptions.on_connection_shutdown = ClientConnection::s_onConnectionShutdown;
connOptions.user_data = reinterpret_cast<void *>(this);
m_lifecycleHandler = connectionLifecycleHandler;
m_connectMessageAmender = m_connectionConfig.GetConnectMessageAmender();
if (m_connectionConfig.GetTlsConnectionOptions().has_value())
{
connOptions.tls_options = m_connectionConfig.GetTlsConnectionOptions()->GetUnderlyingHandle();
}
int crtError = aws_event_stream_rpc_client_connection_connect(m_allocator, &connOptions);
if (crtError)
{
std::promise<RpcError> errorPromise;
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"A CRT error occurred while attempting to establish the connection: %s",
Crt::ErrorDebugString(crtError));
errorPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, crtError});
const std::lock_guard<std::recursive_mutex> lock(m_stateMutex);
m_clientState = DISCONNECTED;
return errorPromise.get_future();
}
else
{
const std::lock_guard<std::recursive_mutex> lock(m_stateMutex);
m_connectionWillSetup = true;
}
return m_connectAckedPromise.get_future();
}