in router/core/websocket.go [886:985]
func (h *WebSocketConnectionHandler) executeSubscription(registration *SubscriptionRegistration) {
rw := newWebsocketResponseWriter(registration.msg.ID, h.protocol, h.graphqlHandler.subgraphErrorPropagation.Enabled, h.logger, h.stats)
_, operationCtx, err := h.parseAndPlan(registration)
if err != nil {
wErr := h.writeErrorMessage(registration.msg.ID, err)
if wErr != nil {
h.logger.Warn("writing error message", zap.Error(wErr))
}
return
}
if h.forwardUpgradeHeaders.enabled && h.upgradeRequestHeaders != nil {
if operationCtx.extensions == nil {
operationCtx.extensions = json.RawMessage("{}")
}
operationCtx.extensions, err = jsonparser.Set(operationCtx.extensions, h.upgradeRequestHeaders, "upgradeHeaders")
if err != nil {
h.logger.Warn("Setting upgrade request data", zap.Error(err))
_ = h.writeErrorMessage(registration.msg.ID, err)
return
}
}
if h.forwardQueryParams.enabled && h.upgradeRequestQueryParams != nil {
if operationCtx.extensions == nil {
operationCtx.extensions = json.RawMessage("{}")
}
operationCtx.extensions, err = jsonparser.Set(operationCtx.extensions, h.upgradeRequestQueryParams, "upgradeQueryParams")
if err != nil {
h.logger.Warn("Setting upgrade request data", zap.Error(err))
_ = h.writeErrorMessage(registration.msg.ID, err)
return
}
}
if h.forwardInitialPayload && operationCtx.initialPayload != nil {
if operationCtx.extensions == nil {
operationCtx.extensions = json.RawMessage("{}")
}
operationCtx.extensions, err = jsonparser.Set(operationCtx.extensions, operationCtx.initialPayload, "initialPayload")
if err != nil {
h.logger.Warn("Setting initial payload", zap.Error(err))
_ = h.writeErrorMessage(registration.msg.ID, err)
return
}
}
resolveCtx := &resolve.Context{
Variables: operationCtx.Variables(),
Request: resolve.Request{
Header: registration.clientRequest.Header,
ID: h.initRequestID,
},
RenameTypeNames: h.graphqlHandler.executor.RenameTypeNames,
RemapVariables: operationCtx.remapVariables,
TracingOptions: operationCtx.traceOptions,
Extensions: operationCtx.extensions,
}
if h.forwardInitialPayload && operationCtx.initialPayload != nil {
resolveCtx.InitialPayload = operationCtx.initialPayload
}
reqContext := buildRequestContext(requestContextOptions{
operationContext: operationCtx,
requestLogger: h.logger,
metricSetAttributes: nil,
w: nil,
r: registration.clientRequest,
})
resolveCtx = resolveCtx.WithContext(withRequestContext(h.ctx, reqContext))
if h.graphqlHandler.authorizer != nil {
resolveCtx = WithAuthorizationExtension(resolveCtx)
resolveCtx.SetAuthorizer(h.graphqlHandler.authorizer)
}
resolveCtx = h.graphqlHandler.configureRateLimiting(resolveCtx)
// Put in a closure to evaluate err after defer
defer func() {
// StatusCode has no meaning here. We set it to 0 but set the error.
h.metrics.ExportSchemaUsageInfo(operationCtx, 0, err != nil, false)
}()
switch p := operationCtx.preparedPlan.preparedPlan.(type) {
case *plan.SynchronousResponsePlan:
_, err = h.graphqlHandler.executor.Resolver.ResolveGraphQLResponse(resolveCtx, p.Response, nil, rw)
if err != nil {
h.logger.Warn("Resolving GraphQL response", zap.Error(err))
h.graphqlHandler.WriteError(resolveCtx, err, p.Response, rw)
}
_ = rw.Flush()
rw.Complete()
case *plan.SubscriptionResponsePlan:
err = h.graphqlHandler.executor.Resolver.AsyncResolveGraphQLSubscription(resolveCtx, p.Response, rw.SubscriptionResponseWriter(), registration.id)
if err != nil {
h.logger.Warn("Resolving GraphQL subscription", zap.Error(err))
h.graphqlHandler.WriteError(resolveCtx, err, p.Response.Response, rw)
return
}
}
}