in router/core/graphql_prehandler.go [456:960]
func (h *PreHandler) handleOperation(req *http.Request, variablesParser *astjson.Parser, httpOperation *httpOperation) error {
operationKit, err := h.operationProcessor.NewKit()
if err != nil {
return err
}
defer func() {
// the kit must be freed before we're doing io operations
// the kit is bound to the number of CPUs, and we must not hold onto it while doing IO operations
// it needs to be called inside a defer to ensure it is called in panic situations as well
if operationKit != nil {
operationKit.Free()
}
}()
requestContext := httpOperation.requestContext
// Handle the case when operation information are provided as GET parameters
if req.Method == http.MethodGet {
if err := operationKit.UnmarshalOperationFromURL(req.URL); err != nil {
return &httpGraphqlError{
message: fmt.Sprintf("error parsing request query params: %s", err),
statusCode: http.StatusBadRequest,
}
}
} else if req.Method == http.MethodPost {
if err := operationKit.UnmarshalOperationFromBody(httpOperation.body); err != nil {
return &httpGraphqlError{
message: "error parsing request body",
statusCode: http.StatusBadRequest,
}
}
// If we have files, we need to set them on the parsed operation
if len(httpOperation.files) > 0 {
requestContext.operation.files = httpOperation.files
}
}
// Compute the operation sha256 hash as soon as possible for observability reasons
if h.shouldComputeOperationSha256(operationKit) {
if err := operationKit.ComputeOperationSha256(); err != nil {
return &httpGraphqlError{
message: fmt.Sprintf("error hashing operation: %s", err),
statusCode: http.StatusInternalServerError,
}
}
requestContext.operation.sha256Hash = operationKit.parsedOperation.Sha256Hash
requestContext.telemetry.addCustomMetricStringAttr(ContextFieldOperationSha256, requestContext.operation.sha256Hash)
if h.operationBlocker.SafelistEnabled || h.operationBlocker.LogUnknownOperationsEnabled {
// Set the request hash to the parsed hash, to see if it matches a persisted operation
operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery = &GraphQLRequestExtensionsPersistedQuery{
Sha256Hash: operationKit.parsedOperation.Sha256Hash,
}
}
}
requestContext.operation.extensions = operationKit.parsedOperation.Request.Extensions
requestContext.operation.variables, err = variablesParser.ParseBytes(operationKit.parsedOperation.Request.Variables)
if err != nil {
return &httpGraphqlError{
message: fmt.Sprintf("error parsing variables: %s", err),
statusCode: http.StatusBadRequest,
}
}
var (
skipParse bool
isApq bool
)
if h.shouldFetchPersistedOperation(operationKit) {
ctx, span := h.tracer.Start(req.Context(), "Load Persisted Operation",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(requestContext.telemetry.traceAttrs...),
)
skipParse, isApq, err = operationKit.FetchPersistedOperation(ctx, requestContext.operation.clientInfo)
span.SetAttributes(otel.WgEnginePersistedOperationCacheHit.Bool(operationKit.parsedOperation.PersistedOperationCacheHit))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
var poNotFoundErr *persistedoperation.PersistentOperationNotFoundError
if h.operationBlocker.LogUnknownOperationsEnabled && errors.As(err, &poNotFoundErr) {
requestContext.logger.Warn("Unknown persisted operation found", zap.String("query", operationKit.parsedOperation.Request.Query), zap.String("sha256Hash", poNotFoundErr.Sha256Hash))
if h.operationBlocker.SafelistEnabled {
span.End()
return err
}
} else {
span.End()
return err
}
}
span.End()
requestContext.operation.persistedOperationCacheHit = operationKit.parsedOperation.PersistedOperationCacheHit
}
// If the persistent operation is already in the cache, we skip the parse step
// because the operation was already parsed. This is a performance optimization, and we
// can do it because we know that the persisted operation is immutable (identified by the hash)
if !skipParse {
_, engineParseSpan := h.tracer.Start(req.Context(), "Operation - Parse",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(requestContext.telemetry.traceAttrs...),
)
httpOperation.traceTimings.StartParse()
startParsing := time.Now()
err = operationKit.Parse()
if err != nil {
rtrace.AttachErrToSpan(engineParseSpan, err)
requestContext.operation.parsingTime = time.Since(startParsing)
if !requestContext.operation.traceOptions.ExcludeParseStats {
httpOperation.traceTimings.EndParse()
}
engineParseSpan.End()
return err
}
requestContext.operation.parsingTime = time.Since(startParsing)
if !requestContext.operation.traceOptions.ExcludeParseStats {
httpOperation.traceTimings.EndParse()
}
engineParseSpan.End()
}
requestContext.operation.name = operationKit.parsedOperation.Request.OperationName
requestContext.operation.opType = operationKit.parsedOperation.Type
attributesAfterParse := []attribute.KeyValue{
otel.WgOperationName.String(operationKit.parsedOperation.Request.OperationName),
otel.WgOperationType.String(operationKit.parsedOperation.Type),
}
requestContext.telemetry.addCommonAttribute(attributesAfterParse...)
// Set the router span name after we have the operation name
httpOperation.routerSpan.SetName(GetSpanName(operationKit.parsedOperation.Request.OperationName, operationKit.parsedOperation.Type))
if req.Method == http.MethodGet && operationKit.parsedOperation.Type == "mutation" {
return &httpGraphqlError{
message: "Mutations can only be sent over HTTP POST",
statusCode: http.StatusMethodNotAllowed,
}
}
// Set the operation name and type to the operation metrics and the router span as early as possible
httpOperation.routerSpan.SetAttributes(attributesAfterParse...)
if err := h.operationBlocker.OperationIsBlocked(requestContext.logger, requestContext.expressionContext, operationKit.parsedOperation); err != nil {
return &httpGraphqlError{
message: err.Error(),
statusCode: http.StatusOK,
}
}
if operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery != nil &&
operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery.Sha256Hash != "" {
requestContext.operation.persistedID = operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery.Sha256Hash
persistedIDAttribute := otel.WgOperationPersistedID.String(operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery.Sha256Hash)
requestContext.telemetry.addCommonAttribute(persistedIDAttribute)
httpOperation.routerSpan.SetAttributes(persistedIDAttribute)
}
/**
* Normalize the operation
*/
if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
httpOperation.traceTimings.StartNormalize()
}
startNormalization := time.Now()
_, engineNormalizeSpan := h.tracer.Start(req.Context(), "Operation - Normalize",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(requestContext.telemetry.traceAttrs...),
)
cached, err := operationKit.NormalizeOperation(requestContext.operation.clientInfo.Name, isApq)
if err != nil {
rtrace.AttachErrToSpan(engineNormalizeSpan, err)
requestContext.operation.normalizationTime = time.Since(startNormalization)
if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
httpOperation.traceTimings.EndNormalize()
}
engineNormalizeSpan.End()
return err
}
// Set the cache hit attribute on the span
engineNormalizeSpan.SetAttributes(otel.WgNormalizationCacheHit.Bool(cached))
requestContext.operation.normalizationCacheHit = operationKit.parsedOperation.NormalizationCacheHit
/**
* Normalize the variables
*/
// Normalize the variables returns list of uploads mapping if there are any of them present in a query
// type UploadPathMapping struct {
// VariableName string - is a variable name holding the direct or nested value of type Upload, example "f"
// OriginalUploadPath string - is a path relative to variables which have an Upload type, example "variables.f"
// NewUploadPath string - if variable was used in the inline object like this `arg: {f: $f}` this field will hold the new extracted path, example "variables.a.f", if it is an empty, there was no change in the path
// }
uploadsMapping, err := operationKit.NormalizeVariables()
if err != nil {
rtrace.AttachErrToSpan(engineNormalizeSpan, err)
requestContext.operation.normalizationTime = time.Since(startNormalization)
if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
httpOperation.traceTimings.EndNormalize()
}
engineNormalizeSpan.End()
return err
}
// update file uploads path if they were used in nested field in the extracted variables
for mapping := range slices.Values(uploadsMapping) {
// if the NewUploadPath is empty it means that there was no change in the path - e.g. upload was directly passed to the argument
// e.g. field(fileArgument: $file) will result in []UploadPathMapping{ {VariableName: "file", OriginalUploadPath: "variables.file", NewUploadPath: ""} }
if mapping.NewUploadPath == "" {
continue
}
// look for the corresponding file which was used in the nested argument
// we are matching original upload path passed via uploads map with the mapping items
idx := slices.IndexFunc(requestContext.operation.files, func(file *httpclient.FileUpload) bool {
return file.VariablePath() == mapping.OriginalUploadPath
})
if idx == -1 {
continue
}
// if NewUploadPath is not empty the file argument was used in the nested object, and we need to update the path
// e.g. field(arg: {file: $file}) normalized to field(arg: $a) will result in []UploadPathMapping{ {VariableName: "file", OriginalUploadPath: "variables.file", NewUploadPath: "variables.a.file"} }
// so "variables.file" should be updated to "variables.a.file"
requestContext.operation.files[idx].SetVariablePath(uploadsMapping[idx].NewUploadPath)
}
// RemapVariables is updating and sort variables name to be able to have them in a predictable order
// after remapping requestContext.operation.remapVariables map will contain new names as a keys and old names as a values - to be able to extract the old values
// because it does not rename variables in a variables json
err = operationKit.RemapVariables(h.disableVariablesRemapping)
if err != nil {
rtrace.AttachErrToSpan(engineNormalizeSpan, err)
requestContext.operation.normalizationTime = time.Since(startNormalization)
if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
httpOperation.traceTimings.EndNormalize()
}
engineNormalizeSpan.End()
return err
}
requestContext.operation.hash = operationKit.parsedOperation.ID
requestContext.operation.internalHash = operationKit.parsedOperation.InternalID
requestContext.operation.remapVariables = operationKit.parsedOperation.RemapVariables
if !h.disableVariablesRemapping && len(uploadsMapping) > 0 {
// after variables remapping we need to update the file uploads path because variables relative path has changed
// but files still references the old uploads locations
// key `to` is a new variable name
// value `from` is an old variable name
// we are looping through remapped variables to find a match between old variable name and variable which was holding an upload
for to, from := range maps.All(requestContext.operation.remapVariables) {
// loop over upload mappings to find a match between variable name and upload variable name
for uploadMapping := range slices.Values(uploadsMapping) {
if uploadMapping.VariableName != from {
continue
}
uploadPath := uploadMapping.NewUploadPath
// if NewUploadPath is empty it means that there was no change in the path - e.g. upload was directly passed to the argument
if uploadPath == "" {
uploadPath = uploadMapping.OriginalUploadPath
}
// next step is to compare file upload path with the original upload path from the upload mappings
for file := range slices.Values(requestContext.operation.files) {
if file.VariablePath() != uploadPath {
continue
}
// trim old variable name prefix
oldUploadPathPrefix := fmt.Sprintf("variables.%s.", from)
relativeUploadPath := strings.TrimPrefix(uploadPath, oldUploadPathPrefix)
// set new variable name prefix
updatedPath := fmt.Sprintf("variables.%s.%s", to, relativeUploadPath)
file.SetVariablePath(updatedPath)
}
}
}
}
operationHashString := strconv.FormatUint(operationKit.parsedOperation.ID, 10)
operationHashAttribute := otel.WgOperationHash.String(operationHashString)
requestContext.telemetry.addCommonAttribute(operationHashAttribute)
httpOperation.routerSpan.SetAttributes(operationHashAttribute)
requestContext.operation.rawContent = operationKit.parsedOperation.Request.Query
requestContext.operation.content = operationKit.parsedOperation.NormalizedRepresentation
requestContext.operation.variables, err = variablesParser.ParseBytes(operationKit.parsedOperation.Request.Variables)
if err != nil {
rtrace.AttachErrToSpan(engineNormalizeSpan, err)
if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
httpOperation.traceTimings.EndNormalize()
}
engineNormalizeSpan.End()
return err
}
requestContext.operation.normalizationTime = time.Since(startNormalization)
if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
httpOperation.traceTimings.EndNormalize()
}
engineNormalizeSpan.End()
if operationKit.parsedOperation.IsPersistedOperation {
engineNormalizeSpan.SetAttributes(otel.WgEnginePersistedOperationCacheHit.Bool(operationKit.parsedOperation.PersistedOperationCacheHit))
}
if h.traceExportVariables {
// At this stage the variables are normalized
httpOperation.routerSpan.SetAttributes(otel.WgOperationVariables.String(string(operationKit.parsedOperation.Request.Variables)))
}
// Set the normalized operation only on the root span
operationContentAttribute := otel.WgOperationContent.String(operationKit.parsedOperation.NormalizedRepresentation)
httpOperation.routerSpan.SetAttributes(operationContentAttribute)
/**
* Validate the operation
*/
if !requestContext.operation.traceOptions.ExcludeValidateStats {
httpOperation.traceTimings.StartValidate()
}
startValidation := time.Now()
_, engineValidateSpan := h.tracer.Start(req.Context(), "Operation - Validate",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(requestContext.telemetry.traceAttrs...),
)
validationCached, err := operationKit.Validate(requestContext.operation.executionOptions.SkipLoader, requestContext.operation.remapVariables, h.apolloCompatibilityFlags)
if err != nil {
rtrace.AttachErrToSpan(engineValidateSpan, err)
requestContext.operation.validationTime = time.Since(startValidation)
if !requestContext.operation.traceOptions.ExcludeValidateStats {
httpOperation.traceTimings.EndValidate()
}
engineValidateSpan.End()
return err
}
engineValidateSpan.SetAttributes(otel.WgValidationCacheHit.Bool(validationCached))
if requestContext.operation.executionOptions.SkipLoader {
// In case we're skipping the loader, which means that we won't execute the operation
// we skip the validation of variables as we're not using them
// this allows us to generate query plans without having to provide variables
engineValidateSpan.SetAttributes(otel.WgVariablesValidationSkipped.Bool(true))
}
// Validate that the planned query doesn't exceed the maximum query depth configured
// This check runs if they've configured a max query depth, and it can optionally be turned off for persisted operations
if h.complexityLimits != nil {
cacheHit, complexityCalcs, queryDepthErr := operationKit.ValidateQueryComplexity(h.complexityLimits, operationKit.kit.doc, h.executor.RouterSchema, operationKit.parsedOperation.IsPersistedOperation)
engineValidateSpan.SetAttributes(otel.WgQueryDepth.Int(complexityCalcs.Depth))
engineValidateSpan.SetAttributes(otel.WgQueryTotalFields.Int(complexityCalcs.TotalFields))
engineValidateSpan.SetAttributes(otel.WgQueryRootFields.Int(complexityCalcs.RootFields))
engineValidateSpan.SetAttributes(otel.WgQueryRootFieldAliases.Int(complexityCalcs.RootFieldAliases))
engineValidateSpan.SetAttributes(otel.WgQueryDepthCacheHit.Bool(cacheHit))
if queryDepthErr != nil {
rtrace.AttachErrToSpan(engineValidateSpan, err)
requestContext.operation.validationTime = time.Since(startValidation)
httpOperation.traceTimings.EndValidate()
engineValidateSpan.End()
return queryDepthErr
}
}
requestContext.operation.validationTime = time.Since(startValidation)
httpOperation.traceTimings.EndValidate()
engineValidateSpan.End()
/**
* Plan the operation
*/
// If the request has a query parameter wg_trace=true we skip the cache
// and always plan the operation
// this allows us to "write" to the plan
if !requestContext.operation.traceOptions.ExcludePlannerStats {
httpOperation.traceTimings.StartPlanning()
}
startPlanning := time.Now()
_, enginePlanSpan := h.tracer.Start(req.Context(), "Operation - Plan",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(otel.WgEngineRequestTracingEnabled.Bool(requestContext.operation.traceOptions.Enable)),
trace.WithAttributes(requestContext.telemetry.traceAttrs...),
)
planOptions := PlanOptions{
ClientInfo: requestContext.operation.clientInfo,
TraceOptions: requestContext.operation.traceOptions,
ExecutionOptions: requestContext.operation.executionOptions,
TrackSchemaUsageInfo: h.trackSchemaUsageInfo,
}
err = h.planner.plan(requestContext.operation, planOptions)
if err != nil {
httpOperation.requestLogger.Error("failed to plan operation", zap.Error(err))
rtrace.AttachErrToSpan(enginePlanSpan, err)
if !requestContext.operation.traceOptions.ExcludePlannerStats {
httpOperation.traceTimings.EndPlanning()
}
enginePlanSpan.End()
return err
}
enginePlanSpan.SetAttributes(otel.WgEnginePlanCacheHit.Bool(requestContext.operation.planCacheHit))
requestContext.operation.planningTime = time.Since(startPlanning)
httpOperation.traceTimings.EndPlanning()
enginePlanSpan.End()
planningAttrs := *requestContext.telemetry.AcquireAttributes()
planningAttrs = append(planningAttrs, otel.WgEnginePlanCacheHit.Bool(requestContext.operation.planCacheHit))
planningAttrs = append(planningAttrs, requestContext.telemetry.metricAttrs...)
httpOperation.operationMetrics.routerMetrics.MetricStore().MeasureOperationPlanningTime(
req.Context(),
requestContext.operation.planningTime,
requestContext.telemetry.metricSliceAttrs,
otelmetric.WithAttributeSet(attribute.NewSet(planningAttrs...)),
)
requestContext.telemetry.ReleaseAttributes(&planningAttrs)
// we could log the query plan only if query plans are calculated
if (h.queryPlansEnabled && requestContext.operation.executionOptions.IncludeQueryPlanInResponse) ||
h.alwaysIncludeQueryPlan {
switch p := requestContext.operation.preparedPlan.preparedPlan.(type) {
case *plan.SynchronousResponsePlan:
p.Response.Fetches.NormalizedQuery = operationKit.parsedOperation.NormalizedRepresentation
}
if h.queryPlansLoggingEnabled {
switch p := requestContext.operation.preparedPlan.preparedPlan.(type) {
case *plan.SynchronousResponsePlan:
printedPlan := p.Response.Fetches.QueryPlan().PrettyPrint()
if h.developmentMode {
h.log.Sugar().Debugf("Query Plan:\n%s", printedPlan)
} else {
h.log.Debug("Query Plan", zap.String("query_plan", printedPlan))
}
}
}
}
return nil
}