in eventstream_rpc/source/EventStreamClient.cpp [501:564]
std::future<RpcError> ClientConnection::s_sendProtocolMessage(
ClientConnection *connection,
const Crt::List<EventStreamHeader> &headers,
const Crt::Optional<Crt::ByteBuf> &payload,
MessageType messageType,
uint32_t messageFlags,
OnMessageFlushCallback onMessageFlushCallback) noexcept
{
std::promise<RpcError> onFlushPromise;
OnMessageFlushCallbackContainer *callbackContainer = nullptr;
struct aws_array_list headersArray;
/* The caller should never pass a NULL connection. */
AWS_PRECONDITION(connection != nullptr);
int errorCode = EventStreamCppToNativeCrtBuilder::s_fillNativeHeadersArray(
headers, &headersArray, connection->m_allocator);
if (!errorCode)
{
struct aws_event_stream_rpc_message_args msg_args;
msg_args.headers = (struct aws_event_stream_header_value_pair *)headersArray.data;
msg_args.headers_count = headers.size();
msg_args.payload = payload.has_value() ? (aws_byte_buf *)(&(payload.value())) : nullptr;
msg_args.message_type = messageType;
msg_args.message_flags = messageFlags;
/* This heap allocation is necessary so that the flush callback can still be invoked when this function
* returns. */
callbackContainer =
Crt::New<OnMessageFlushCallbackContainer>(connection->m_allocator, connection->m_allocator);
callbackContainer->onMessageFlushCallback = onMessageFlushCallback;
callbackContainer->onFlushPromise = std::move(onFlushPromise);
errorCode = aws_event_stream_rpc_client_connection_send_protocol_message(
connection->m_underlyingConnection,
&msg_args,
ClientConnection::s_protocolMessageCallback,
reinterpret_cast<void *>(callbackContainer));
}
/* Cleanup. */
if (aws_array_list_is_valid(&headersArray))
{
aws_array_list_clean_up(&headersArray);
}
if (errorCode)
{
onFlushPromise = std::move(callbackContainer->onFlushPromise);
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"A CRT error occurred while queueing a message to be sent on the connection: %s",
Crt::ErrorDebugString(errorCode));
onFlushPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
Crt::Delete(callbackContainer, connection->m_allocator);
}
else
{
return callbackContainer->onFlushPromise.get_future();
}
return onFlushPromise.get_future();
}