override fun reactiveExecuteQuery()

in graphql-dgs-client/src/main/kotlin/com/netflix/graphql/dgs/client/WebSocketGraphQLClient.kt [106:141]


    override fun reactiveExecuteQuery(
        @Language("graphql") query: String,
        variables: Map<String, Any>,
    ): Flux<GraphQLResponse> = reactiveExecuteQuery(query, variables, null)

    override fun reactiveExecuteQuery(
        @Language("graphql") query: String,
        variables: Map<String, Any>,
        operationName: String?,
    ): Flux<GraphQLResponse> {
        // Generate a unique number for each subscription in the same session.
        val subscriptionId =
            subscriptionCount
                .incrementAndGet()
                .toString()
        val queryMessage =
            OperationMessage(
                GQL_START,
                QueryPayload(variables, emptyMap(), operationName, query),
                subscriptionId,
            )
        val stopMessage = OperationMessage(GQL_STOP, null, subscriptionId)

        // Because handshake is cached it should have only been done once, all subsequent calls to
        // reactiveExecuteQuery() will proceed straight to client.receive()
        return handshake
            .doOnSuccess { client.send(queryMessage) }
            .thenMany(
                client
                    .receive()
                    .filter { it.id == subscriptionId }
                    .takeUntil { it.type == GQL_COMPLETE }
                    .doOnCancel { client.send(stopMessage) }
                    .flatMap(this::handleMessage),
            )
    }