in eventstream_rpc/source/EventStreamClient.cpp [983:1056]
std::future<RpcError> ClientContinuation::Activate(
const Crt::String &operationName,
const Crt::List<EventStreamHeader> &headers,
const Crt::Optional<Crt::ByteBuf> &payload,
MessageType messageType,
uint32_t messageFlags,
OnMessageFlushCallback onMessageFlushCallback) noexcept
{
struct aws_array_list headersArray;
OnMessageFlushCallbackContainer *callbackContainer = nullptr;
std::promise<RpcError> onFlushPromise;
if (m_continuationToken == nullptr)
{
onFlushPromise.set_value({EVENT_STREAM_RPC_CONNECTION_CLOSED, 0});
return onFlushPromise.get_future();
}
if (IsClosed())
{
onFlushPromise.set_value({EVENT_STREAM_RPC_CONTINUATION_CLOSED, 0});
return onFlushPromise.get_future();
}
int errorCode =
EventStreamCppToNativeCrtBuilder::s_fillNativeHeadersArray(headers, &headersArray, m_allocator);
/*
* Regardless of how the promise gets moved around (or not), this future should stay valid as a return
* value.
*
* We pull it out early because the call to aws_event_stream_rpc_client_continuation_activate() may complete
* and delete the promise before we pull out the future afterwords.
*/
std::future<RpcError> retValue = onFlushPromise.get_future();
if (!errorCode)
{
struct aws_event_stream_rpc_message_args msg_args;
msg_args.headers = (struct aws_event_stream_header_value_pair *)headersArray.data;
msg_args.headers_count = headers.size();
msg_args.payload = payload.has_value() ? (aws_byte_buf *)(&(payload.value())) : nullptr;
msg_args.message_type = messageType;
msg_args.message_flags = messageFlags;
/* This heap allocation is necessary so that the flush callback can still be invoked when this function
* returns. */
callbackContainer = Crt::New<OnMessageFlushCallbackContainer>(m_allocator, m_allocator);
callbackContainer->onMessageFlushCallback = onMessageFlushCallback;
callbackContainer->onFlushPromise = std::move(onFlushPromise);
errorCode = aws_event_stream_rpc_client_continuation_activate(
m_continuationToken,
Crt::ByteCursorFromCString(operationName.c_str()),
&msg_args,
ClientConnection::s_protocolMessageCallback,
reinterpret_cast<void *>(callbackContainer));
}
/* Cleanup. */
if (aws_array_list_is_valid(&headersArray))
{
aws_array_list_clean_up(&headersArray);
}
if (errorCode)
{
onFlushPromise = std::move(callbackContainer->onFlushPromise);
onFlushPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
Crt::Delete(callbackContainer, m_allocator);
}
return retValue;
}