std::future ClientContinuation::Activate()

in eventstream_rpc/source/EventStreamClient.cpp [983:1056]


        std::future<RpcError> ClientContinuation::Activate(
            const Crt::String &operationName,
            const Crt::List<EventStreamHeader> &headers,
            const Crt::Optional<Crt::ByteBuf> &payload,
            MessageType messageType,
            uint32_t messageFlags,
            OnMessageFlushCallback onMessageFlushCallback) noexcept
        {
            struct aws_array_list headersArray;
            OnMessageFlushCallbackContainer *callbackContainer = nullptr;
            std::promise<RpcError> onFlushPromise;

            if (m_continuationToken == nullptr)
            {
                onFlushPromise.set_value({EVENT_STREAM_RPC_CONNECTION_CLOSED, 0});
                return onFlushPromise.get_future();
            }

            if (IsClosed())
            {
                onFlushPromise.set_value({EVENT_STREAM_RPC_CONTINUATION_CLOSED, 0});
                return onFlushPromise.get_future();
            }

            int errorCode =
                EventStreamCppToNativeCrtBuilder::s_fillNativeHeadersArray(headers, &headersArray, m_allocator);

            /*
             * Regardless of how the promise gets moved around (or not), this future should stay valid as a return
             * value.
             *
             * We pull it out early because the call to aws_event_stream_rpc_client_continuation_activate() may complete
             * and delete the promise before we pull out the future afterwords.
             */
            std::future<RpcError> retValue = onFlushPromise.get_future();

            if (!errorCode)
            {
                struct aws_event_stream_rpc_message_args msg_args;
                msg_args.headers = (struct aws_event_stream_header_value_pair *)headersArray.data;
                msg_args.headers_count = headers.size();
                msg_args.payload = payload.has_value() ? (aws_byte_buf *)(&(payload.value())) : nullptr;
                msg_args.message_type = messageType;
                msg_args.message_flags = messageFlags;

                /* This heap allocation is necessary so that the flush callback can still be invoked when this function
                 * returns. */
                callbackContainer = Crt::New<OnMessageFlushCallbackContainer>(m_allocator, m_allocator);
                callbackContainer->onMessageFlushCallback = onMessageFlushCallback;
                callbackContainer->onFlushPromise = std::move(onFlushPromise);

                errorCode = aws_event_stream_rpc_client_continuation_activate(
                    m_continuationToken,
                    Crt::ByteCursorFromCString(operationName.c_str()),
                    &msg_args,
                    ClientConnection::s_protocolMessageCallback,
                    reinterpret_cast<void *>(callbackContainer));
            }

            /* Cleanup. */
            if (aws_array_list_is_valid(&headersArray))
            {
                aws_array_list_clean_up(&headersArray);
            }

            if (errorCode)
            {
                onFlushPromise = std::move(callbackContainer->onFlushPromise);
                onFlushPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
                Crt::Delete(callbackContainer, m_allocator);
            }

            return retValue;
        }