std::future ClientOperation::Close()

in eventstream_rpc/source/EventStreamClient.cpp [1572:1628]


        std::future<RpcError> ClientOperation::Close(OnMessageFlushCallback onMessageFlushCallback) noexcept
        {
            const std::lock_guard<std::mutex> lock(m_continuationMutex);
            if (m_expectedCloses.load() > 0 || m_clientContinuation.IsClosed())
            {
                std::promise<RpcError> errorPromise;
                errorPromise.set_value({EVENT_STREAM_RPC_CONTINUATION_CLOSED, 0});
                return errorPromise.get_future();
            }
            else
            {
                std::promise<RpcError> onTerminatePromise;

                int errorCode = AWS_OP_ERR;
                struct aws_event_stream_rpc_message_args msg_args;
                msg_args.headers = nullptr;
                msg_args.headers_count = 0;
                msg_args.payload = nullptr;
                msg_args.message_type = AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE;
                msg_args.message_flags = AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM;

                /* This heap allocation is necessary so that the flush callback can still be invoked when this function
                 * returns. */
                OnMessageFlushCallbackContainer *callbackContainer =
                    Crt::New<OnMessageFlushCallbackContainer>(m_allocator, m_allocator);
                callbackContainer->onMessageFlushCallback = onMessageFlushCallback;
                callbackContainer->onFlushPromise = std::move(onTerminatePromise);

                if (m_clientContinuation.m_continuationToken)
                {
                    errorCode = aws_event_stream_rpc_client_continuation_send_message(
                        m_clientContinuation.m_continuationToken,
                        &msg_args,
                        ClientConnection::s_protocolMessageCallback,
                        reinterpret_cast<void *>(callbackContainer));
                }

                if (errorCode)
                {
                    onTerminatePromise = std::move(callbackContainer->onFlushPromise);
                    std::promise<RpcError> errorPromise;
                    AWS_LOGF_ERROR(
                        AWS_LS_EVENT_STREAM_RPC_CLIENT,
                        "A CRT error occurred while closing the stream: %s",
                        Crt::ErrorDebugString(errorCode));
                    onTerminatePromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
                    Crt::Delete(callbackContainer, m_allocator);
                }
                else
                {
                    m_expectedCloses.fetch_add(1);
                    return callbackContainer->onFlushPromise.get_future();
                }

                return onTerminatePromise.get_future();
            }
        }