override fun reactiveExecuteQuery()

in graphql-dgs-client/src/main/kotlin/com/netflix/graphql/dgs/client/GraphqlSSESubscriptionGraphQLClient.kt [46:79]


    override fun reactiveExecuteQuery(
        @Language("graphql") query: String,
        variables: Map<String, Any>,
        operationName: String?,
    ): Flux<GraphQLResponse> {
        val queryPayload = QueryPayload(variables, emptyMap(), operationName, query)

        val jsonPayload = mapper.writeValueAsString(queryPayload)
        val sink = Sinks.many().unicast().onBackpressureBuffer<GraphQLResponse>()

        val dis =
            webClient
                .post()
                .uri(url)
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(jsonPayload)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .toEntityFlux<String>()
                .flatMapMany {
                    val headers = it.headers
                    it.body?.map { serverSentEvent ->
                        sink.tryEmitNext(GraphQLResponse(json = serverSentEvent, headers = headers, options))
                    } ?: Flux.empty()
                }.onErrorResume {
                    Flux.just(sink.tryEmitError(it))
                }.doFinally {
                    sink.tryEmitComplete()
                }.subscribeOn(Schedulers.boundedElastic())
                .subscribe()
        return sink.asFlux().publishOn(Schedulers.single()).doFinally {
            dis.dispose()
        }
    }