in graphql-dgs-client/src/main/kotlin/com/netflix/graphql/dgs/client/WebSocketGraphQLClient.kt [106:141]
override fun reactiveExecuteQuery(
@Language("graphql") query: String,
variables: Map<String, Any>,
): Flux<GraphQLResponse> = reactiveExecuteQuery(query, variables, null)
override fun reactiveExecuteQuery(
@Language("graphql") query: String,
variables: Map<String, Any>,
operationName: String?,
): Flux<GraphQLResponse> {
// Generate a unique number for each subscription in the same session.
val subscriptionId =
subscriptionCount
.incrementAndGet()
.toString()
val queryMessage =
OperationMessage(
GQL_START,
QueryPayload(variables, emptyMap(), operationName, query),
subscriptionId,
)
val stopMessage = OperationMessage(GQL_STOP, null, subscriptionId)
// Because handshake is cached it should have only been done once, all subsequent calls to
// reactiveExecuteQuery() will proceed straight to client.receive()
return handshake
.doOnSuccess { client.send(queryMessage) }
.thenMany(
client
.receive()
.filter { it.id == subscriptionId }
.takeUntil { it.type == GQL_COMPLETE }
.doOnCancel { client.send(stopMessage) }
.flatMap(this::handleMessage),
)
}