in eventstream_rpc/source/EventStreamClient.cpp [1572:1628]
std::future<RpcError> ClientOperation::Close(OnMessageFlushCallback onMessageFlushCallback) noexcept
{
const std::lock_guard<std::mutex> lock(m_continuationMutex);
if (m_expectedCloses.load() > 0 || m_clientContinuation.IsClosed())
{
std::promise<RpcError> errorPromise;
errorPromise.set_value({EVENT_STREAM_RPC_CONTINUATION_CLOSED, 0});
return errorPromise.get_future();
}
else
{
std::promise<RpcError> onTerminatePromise;
int errorCode = AWS_OP_ERR;
struct aws_event_stream_rpc_message_args msg_args;
msg_args.headers = nullptr;
msg_args.headers_count = 0;
msg_args.payload = nullptr;
msg_args.message_type = AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE;
msg_args.message_flags = AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM;
/* This heap allocation is necessary so that the flush callback can still be invoked when this function
* returns. */
OnMessageFlushCallbackContainer *callbackContainer =
Crt::New<OnMessageFlushCallbackContainer>(m_allocator, m_allocator);
callbackContainer->onMessageFlushCallback = onMessageFlushCallback;
callbackContainer->onFlushPromise = std::move(onTerminatePromise);
if (m_clientContinuation.m_continuationToken)
{
errorCode = aws_event_stream_rpc_client_continuation_send_message(
m_clientContinuation.m_continuationToken,
&msg_args,
ClientConnection::s_protocolMessageCallback,
reinterpret_cast<void *>(callbackContainer));
}
if (errorCode)
{
onTerminatePromise = std::move(callbackContainer->onFlushPromise);
std::promise<RpcError> errorPromise;
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"A CRT error occurred while closing the stream: %s",
Crt::ErrorDebugString(errorCode));
onTerminatePromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
Crt::Delete(callbackContainer, m_allocator);
}
else
{
m_expectedCloses.fetch_add(1);
return callbackContainer->onFlushPromise.get_future();
}
return onTerminatePromise.get_future();
}
}