in graphql-dgs-client/src/main/kotlin/com/netflix/graphql/dgs/client/GraphqlSSESubscriptionGraphQLClient.kt [41:79]
override fun reactiveExecuteQuery(
@Language("graphql") query: String,
variables: Map<String, Any>,
): Flux<GraphQLResponse> = reactiveExecuteQuery(query, variables, null)
override fun reactiveExecuteQuery(
@Language("graphql") query: String,
variables: Map<String, Any>,
operationName: String?,
): Flux<GraphQLResponse> {
val queryPayload = QueryPayload(variables, emptyMap(), operationName, query)
val jsonPayload = mapper.writeValueAsString(queryPayload)
val sink = Sinks.many().unicast().onBackpressureBuffer<GraphQLResponse>()
val dis =
webClient
.post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(jsonPayload)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.toEntityFlux<String>()
.flatMapMany {
val headers = it.headers
it.body?.map { serverSentEvent ->
sink.tryEmitNext(GraphQLResponse(json = serverSentEvent, headers = headers, options))
} ?: Flux.empty()
}.onErrorResume {
Flux.just(sink.tryEmitError(it))
}.doFinally {
sink.tryEmitComplete()
}.subscribeOn(Schedulers.boundedElastic())
.subscribe()
return sink.asFlux().publishOn(Schedulers.single()).doFinally {
dis.dispose()
}
}