func()

in router/core/graphql_prehandler.go [456:960]


func (h *PreHandler) handleOperation(req *http.Request, variablesParser *astjson.Parser, httpOperation *httpOperation) error {
	operationKit, err := h.operationProcessor.NewKit()
	if err != nil {
		return err
	}

	defer func() {
		// the kit must be freed before we're doing io operations
		// the kit is bound to the number of CPUs, and we must not hold onto it while doing IO operations
		// it needs to be called inside a defer to ensure it is called in panic situations as well

		if operationKit != nil {
			operationKit.Free()
		}

	}()

	requestContext := httpOperation.requestContext

	// Handle the case when operation information are provided as GET parameters
	if req.Method == http.MethodGet {
		if err := operationKit.UnmarshalOperationFromURL(req.URL); err != nil {
			return &httpGraphqlError{
				message:    fmt.Sprintf("error parsing request query params: %s", err),
				statusCode: http.StatusBadRequest,
			}
		}
	} else if req.Method == http.MethodPost {
		if err := operationKit.UnmarshalOperationFromBody(httpOperation.body); err != nil {
			return &httpGraphqlError{
				message:    "error parsing request body",
				statusCode: http.StatusBadRequest,
			}
		}
		// If we have files, we need to set them on the parsed operation
		if len(httpOperation.files) > 0 {
			requestContext.operation.files = httpOperation.files
		}
	}

	// Compute the operation sha256 hash as soon as possible for observability reasons
	if h.shouldComputeOperationSha256(operationKit) {
		if err := operationKit.ComputeOperationSha256(); err != nil {
			return &httpGraphqlError{
				message:    fmt.Sprintf("error hashing operation: %s", err),
				statusCode: http.StatusInternalServerError,
			}
		}
		requestContext.operation.sha256Hash = operationKit.parsedOperation.Sha256Hash
		requestContext.telemetry.addCustomMetricStringAttr(ContextFieldOperationSha256, requestContext.operation.sha256Hash)
		if h.operationBlocker.SafelistEnabled || h.operationBlocker.LogUnknownOperationsEnabled {
			// Set the request hash to the parsed hash, to see if it matches a persisted operation
			operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery = &GraphQLRequestExtensionsPersistedQuery{
				Sha256Hash: operationKit.parsedOperation.Sha256Hash,
			}
		}
	}

	requestContext.operation.extensions = operationKit.parsedOperation.Request.Extensions
	requestContext.operation.variables, err = variablesParser.ParseBytes(operationKit.parsedOperation.Request.Variables)
	if err != nil {
		return &httpGraphqlError{
			message:    fmt.Sprintf("error parsing variables: %s", err),
			statusCode: http.StatusBadRequest,
		}
	}

	var (
		skipParse bool
		isApq     bool
	)

	if h.shouldFetchPersistedOperation(operationKit) {
		ctx, span := h.tracer.Start(req.Context(), "Load Persisted Operation",
			trace.WithSpanKind(trace.SpanKindClient),
			trace.WithAttributes(requestContext.telemetry.traceAttrs...),
		)

		skipParse, isApq, err = operationKit.FetchPersistedOperation(ctx, requestContext.operation.clientInfo)
		span.SetAttributes(otel.WgEnginePersistedOperationCacheHit.Bool(operationKit.parsedOperation.PersistedOperationCacheHit))
		if err != nil {
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())

			var poNotFoundErr *persistedoperation.PersistentOperationNotFoundError
			if h.operationBlocker.LogUnknownOperationsEnabled && errors.As(err, &poNotFoundErr) {
				requestContext.logger.Warn("Unknown persisted operation found", zap.String("query", operationKit.parsedOperation.Request.Query), zap.String("sha256Hash", poNotFoundErr.Sha256Hash))
				if h.operationBlocker.SafelistEnabled {
					span.End()
					return err
				}
			} else {
				span.End()
				return err
			}
		}

		span.End()

		requestContext.operation.persistedOperationCacheHit = operationKit.parsedOperation.PersistedOperationCacheHit
	}

	// If the persistent operation is already in the cache, we skip the parse step
	// because the operation was already parsed. This is a performance optimization, and we
	// can do it because we know that the persisted operation is immutable (identified by the hash)
	if !skipParse {
		_, engineParseSpan := h.tracer.Start(req.Context(), "Operation - Parse",
			trace.WithSpanKind(trace.SpanKindInternal),
			trace.WithAttributes(requestContext.telemetry.traceAttrs...),
		)

		httpOperation.traceTimings.StartParse()
		startParsing := time.Now()

		err = operationKit.Parse()
		if err != nil {
			rtrace.AttachErrToSpan(engineParseSpan, err)

			requestContext.operation.parsingTime = time.Since(startParsing)
			if !requestContext.operation.traceOptions.ExcludeParseStats {
				httpOperation.traceTimings.EndParse()
			}

			engineParseSpan.End()

			return err
		}

		requestContext.operation.parsingTime = time.Since(startParsing)
		if !requestContext.operation.traceOptions.ExcludeParseStats {
			httpOperation.traceTimings.EndParse()
		}

		engineParseSpan.End()
	}

	requestContext.operation.name = operationKit.parsedOperation.Request.OperationName
	requestContext.operation.opType = operationKit.parsedOperation.Type

	attributesAfterParse := []attribute.KeyValue{
		otel.WgOperationName.String(operationKit.parsedOperation.Request.OperationName),
		otel.WgOperationType.String(operationKit.parsedOperation.Type),
	}
	requestContext.telemetry.addCommonAttribute(attributesAfterParse...)

	// Set the router span name after we have the operation name
	httpOperation.routerSpan.SetName(GetSpanName(operationKit.parsedOperation.Request.OperationName, operationKit.parsedOperation.Type))

	if req.Method == http.MethodGet && operationKit.parsedOperation.Type == "mutation" {
		return &httpGraphqlError{
			message:    "Mutations can only be sent over HTTP POST",
			statusCode: http.StatusMethodNotAllowed,
		}
	}

	// Set the operation name and type to the operation metrics and the router span as early as possible
	httpOperation.routerSpan.SetAttributes(attributesAfterParse...)

	if err := h.operationBlocker.OperationIsBlocked(requestContext.logger, requestContext.expressionContext, operationKit.parsedOperation); err != nil {
		return &httpGraphqlError{
			message:    err.Error(),
			statusCode: http.StatusOK,
		}
	}

	if operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery != nil &&
		operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery.Sha256Hash != "" {

		requestContext.operation.persistedID = operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery.Sha256Hash
		persistedIDAttribute := otel.WgOperationPersistedID.String(operationKit.parsedOperation.GraphQLRequestExtensions.PersistedQuery.Sha256Hash)

		requestContext.telemetry.addCommonAttribute(persistedIDAttribute)

		httpOperation.routerSpan.SetAttributes(persistedIDAttribute)
	}

	/**
	* Normalize the operation
	 */

	if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
		httpOperation.traceTimings.StartNormalize()
	}

	startNormalization := time.Now()

	_, engineNormalizeSpan := h.tracer.Start(req.Context(), "Operation - Normalize",
		trace.WithSpanKind(trace.SpanKindInternal),
		trace.WithAttributes(requestContext.telemetry.traceAttrs...),
	)

	cached, err := operationKit.NormalizeOperation(requestContext.operation.clientInfo.Name, isApq)
	if err != nil {
		rtrace.AttachErrToSpan(engineNormalizeSpan, err)

		requestContext.operation.normalizationTime = time.Since(startNormalization)
		if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
			httpOperation.traceTimings.EndNormalize()
		}

		engineNormalizeSpan.End()

		return err
	}

	// Set the cache hit attribute on the span
	engineNormalizeSpan.SetAttributes(otel.WgNormalizationCacheHit.Bool(cached))

	requestContext.operation.normalizationCacheHit = operationKit.parsedOperation.NormalizationCacheHit

	/**
	* Normalize the variables
	 */

	// Normalize the variables returns list of uploads mapping if there are any of them present in a query
	// type UploadPathMapping struct {
	// 	VariableName       string - is a variable name holding the direct or nested value of type Upload, example "f"
	// 	OriginalUploadPath string - is a path relative to variables which have an Upload type, example "variables.f"
	// 	NewUploadPath      string - if variable was used in the inline object like this `arg: {f: $f}` this field will hold the new extracted path, example "variables.a.f", if it is an empty, there was no change in the path
	// }
	uploadsMapping, err := operationKit.NormalizeVariables()
	if err != nil {
		rtrace.AttachErrToSpan(engineNormalizeSpan, err)

		requestContext.operation.normalizationTime = time.Since(startNormalization)

		if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
			httpOperation.traceTimings.EndNormalize()
		}

		engineNormalizeSpan.End()

		return err
	}

	// update file uploads path if they were used in nested field in the extracted variables
	for mapping := range slices.Values(uploadsMapping) {
		// if the NewUploadPath is empty it means that there was no change in the path - e.g. upload was directly passed to the argument
		// e.g. field(fileArgument: $file) will result in []UploadPathMapping{ {VariableName: "file", OriginalUploadPath: "variables.file", NewUploadPath: ""} }
		if mapping.NewUploadPath == "" {
			continue
		}

		// look for the corresponding file which was used in the nested argument
		// we are matching original upload path passed via uploads map with the mapping items
		idx := slices.IndexFunc(requestContext.operation.files, func(file *httpclient.FileUpload) bool {
			return file.VariablePath() == mapping.OriginalUploadPath
		})

		if idx == -1 {
			continue
		}

		// if NewUploadPath is not empty the file argument was used in the nested object, and we need to update the path
		// e.g. field(arg: {file: $file}) normalized to field(arg: $a) will result in []UploadPathMapping{ {VariableName: "file", OriginalUploadPath: "variables.file", NewUploadPath: "variables.a.file"} }
		// so "variables.file" should be updated to "variables.a.file"
		requestContext.operation.files[idx].SetVariablePath(uploadsMapping[idx].NewUploadPath)
	}

	// RemapVariables is updating and sort variables name to be able to have them in a predictable order
	// after remapping requestContext.operation.remapVariables map will contain new names as a keys and old names as a values - to be able to extract the old values
	// because it does not rename variables in a variables json
	err = operationKit.RemapVariables(h.disableVariablesRemapping)
	if err != nil {
		rtrace.AttachErrToSpan(engineNormalizeSpan, err)

		requestContext.operation.normalizationTime = time.Since(startNormalization)

		if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
			httpOperation.traceTimings.EndNormalize()
		}

		engineNormalizeSpan.End()

		return err
	}

	requestContext.operation.hash = operationKit.parsedOperation.ID
	requestContext.operation.internalHash = operationKit.parsedOperation.InternalID
	requestContext.operation.remapVariables = operationKit.parsedOperation.RemapVariables

	if !h.disableVariablesRemapping && len(uploadsMapping) > 0 {
		// after variables remapping we need to update the file uploads path because variables relative path has changed
		// but files still references the old uploads locations
		// key `to` is a new variable name
		// value `from` is an old variable name
		// we are looping through remapped variables to find a match between old variable name and variable which was holding an upload
		for to, from := range maps.All(requestContext.operation.remapVariables) {

			// loop over upload mappings to find a match between variable name and upload variable name
			for uploadMapping := range slices.Values(uploadsMapping) {
				if uploadMapping.VariableName != from {
					continue
				}

				uploadPath := uploadMapping.NewUploadPath
				// if NewUploadPath is empty it means that there was no change in the path - e.g. upload was directly passed to the argument
				if uploadPath == "" {
					uploadPath = uploadMapping.OriginalUploadPath
				}

				// next step is to compare file upload path with the original upload path from the upload mappings
				for file := range slices.Values(requestContext.operation.files) {
					if file.VariablePath() != uploadPath {
						continue
					}

					// trim old variable name prefix
					oldUploadPathPrefix := fmt.Sprintf("variables.%s.", from)
					relativeUploadPath := strings.TrimPrefix(uploadPath, oldUploadPathPrefix)

					// set new variable name prefix
					updatedPath := fmt.Sprintf("variables.%s.%s", to, relativeUploadPath)
					file.SetVariablePath(updatedPath)
				}
			}
		}
	}

	operationHashString := strconv.FormatUint(operationKit.parsedOperation.ID, 10)

	operationHashAttribute := otel.WgOperationHash.String(operationHashString)
	requestContext.telemetry.addCommonAttribute(operationHashAttribute)
	httpOperation.routerSpan.SetAttributes(operationHashAttribute)

	requestContext.operation.rawContent = operationKit.parsedOperation.Request.Query
	requestContext.operation.content = operationKit.parsedOperation.NormalizedRepresentation
	requestContext.operation.variables, err = variablesParser.ParseBytes(operationKit.parsedOperation.Request.Variables)
	if err != nil {
		rtrace.AttachErrToSpan(engineNormalizeSpan, err)
		if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
			httpOperation.traceTimings.EndNormalize()
		}
		engineNormalizeSpan.End()
		return err
	}
	requestContext.operation.normalizationTime = time.Since(startNormalization)

	if !requestContext.operation.traceOptions.ExcludeNormalizeStats {
		httpOperation.traceTimings.EndNormalize()
	}

	engineNormalizeSpan.End()

	if operationKit.parsedOperation.IsPersistedOperation {
		engineNormalizeSpan.SetAttributes(otel.WgEnginePersistedOperationCacheHit.Bool(operationKit.parsedOperation.PersistedOperationCacheHit))
	}

	if h.traceExportVariables {
		// At this stage the variables are normalized
		httpOperation.routerSpan.SetAttributes(otel.WgOperationVariables.String(string(operationKit.parsedOperation.Request.Variables)))
	}

	// Set the normalized operation only on the root span
	operationContentAttribute := otel.WgOperationContent.String(operationKit.parsedOperation.NormalizedRepresentation)
	httpOperation.routerSpan.SetAttributes(operationContentAttribute)

	/**
	* Validate the operation
	 */

	if !requestContext.operation.traceOptions.ExcludeValidateStats {
		httpOperation.traceTimings.StartValidate()
	}

	startValidation := time.Now()

	_, engineValidateSpan := h.tracer.Start(req.Context(), "Operation - Validate",
		trace.WithSpanKind(trace.SpanKindInternal),
		trace.WithAttributes(requestContext.telemetry.traceAttrs...),
	)
	validationCached, err := operationKit.Validate(requestContext.operation.executionOptions.SkipLoader, requestContext.operation.remapVariables, h.apolloCompatibilityFlags)
	if err != nil {
		rtrace.AttachErrToSpan(engineValidateSpan, err)

		requestContext.operation.validationTime = time.Since(startValidation)

		if !requestContext.operation.traceOptions.ExcludeValidateStats {
			httpOperation.traceTimings.EndValidate()
		}

		engineValidateSpan.End()

		return err
	}

	engineValidateSpan.SetAttributes(otel.WgValidationCacheHit.Bool(validationCached))
	if requestContext.operation.executionOptions.SkipLoader {
		// In case we're skipping the loader, which means that we won't execute the operation
		// we skip the validation of variables as we're not using them
		// this allows us to generate query plans without having to provide variables
		engineValidateSpan.SetAttributes(otel.WgVariablesValidationSkipped.Bool(true))
	}

	// Validate that the planned query doesn't exceed the maximum query depth configured
	// This check runs if they've configured a max query depth, and it can optionally be turned off for persisted operations
	if h.complexityLimits != nil {
		cacheHit, complexityCalcs, queryDepthErr := operationKit.ValidateQueryComplexity(h.complexityLimits, operationKit.kit.doc, h.executor.RouterSchema, operationKit.parsedOperation.IsPersistedOperation)
		engineValidateSpan.SetAttributes(otel.WgQueryDepth.Int(complexityCalcs.Depth))
		engineValidateSpan.SetAttributes(otel.WgQueryTotalFields.Int(complexityCalcs.TotalFields))
		engineValidateSpan.SetAttributes(otel.WgQueryRootFields.Int(complexityCalcs.RootFields))
		engineValidateSpan.SetAttributes(otel.WgQueryRootFieldAliases.Int(complexityCalcs.RootFieldAliases))
		engineValidateSpan.SetAttributes(otel.WgQueryDepthCacheHit.Bool(cacheHit))
		if queryDepthErr != nil {
			rtrace.AttachErrToSpan(engineValidateSpan, err)

			requestContext.operation.validationTime = time.Since(startValidation)
			httpOperation.traceTimings.EndValidate()

			engineValidateSpan.End()

			return queryDepthErr
		}
	}

	requestContext.operation.validationTime = time.Since(startValidation)
	httpOperation.traceTimings.EndValidate()

	engineValidateSpan.End()

	/**
	* Plan the operation
	 */

	// If the request has a query parameter wg_trace=true we skip the cache
	// and always plan the operation
	// this allows us to "write" to the plan
	if !requestContext.operation.traceOptions.ExcludePlannerStats {
		httpOperation.traceTimings.StartPlanning()
	}
	startPlanning := time.Now()

	_, enginePlanSpan := h.tracer.Start(req.Context(), "Operation - Plan",
		trace.WithSpanKind(trace.SpanKindInternal),
		trace.WithAttributes(otel.WgEngineRequestTracingEnabled.Bool(requestContext.operation.traceOptions.Enable)),
		trace.WithAttributes(requestContext.telemetry.traceAttrs...),
	)

	planOptions := PlanOptions{
		ClientInfo:           requestContext.operation.clientInfo,
		TraceOptions:         requestContext.operation.traceOptions,
		ExecutionOptions:     requestContext.operation.executionOptions,
		TrackSchemaUsageInfo: h.trackSchemaUsageInfo,
	}

	err = h.planner.plan(requestContext.operation, planOptions)
	if err != nil {

		httpOperation.requestLogger.Error("failed to plan operation", zap.Error(err))
		rtrace.AttachErrToSpan(enginePlanSpan, err)

		if !requestContext.operation.traceOptions.ExcludePlannerStats {
			httpOperation.traceTimings.EndPlanning()
		}

		enginePlanSpan.End()

		return err
	}

	enginePlanSpan.SetAttributes(otel.WgEnginePlanCacheHit.Bool(requestContext.operation.planCacheHit))

	requestContext.operation.planningTime = time.Since(startPlanning)
	httpOperation.traceTimings.EndPlanning()

	enginePlanSpan.End()

	planningAttrs := *requestContext.telemetry.AcquireAttributes()
	planningAttrs = append(planningAttrs, otel.WgEnginePlanCacheHit.Bool(requestContext.operation.planCacheHit))
	planningAttrs = append(planningAttrs, requestContext.telemetry.metricAttrs...)

	httpOperation.operationMetrics.routerMetrics.MetricStore().MeasureOperationPlanningTime(
		req.Context(),
		requestContext.operation.planningTime,
		requestContext.telemetry.metricSliceAttrs,
		otelmetric.WithAttributeSet(attribute.NewSet(planningAttrs...)),
	)

	requestContext.telemetry.ReleaseAttributes(&planningAttrs)

	// we could log the query plan only if query plans are calculated
	if (h.queryPlansEnabled && requestContext.operation.executionOptions.IncludeQueryPlanInResponse) ||
		h.alwaysIncludeQueryPlan {

		switch p := requestContext.operation.preparedPlan.preparedPlan.(type) {
		case *plan.SynchronousResponsePlan:
			p.Response.Fetches.NormalizedQuery = operationKit.parsedOperation.NormalizedRepresentation
		}

		if h.queryPlansLoggingEnabled {
			switch p := requestContext.operation.preparedPlan.preparedPlan.(type) {
			case *plan.SynchronousResponsePlan:
				printedPlan := p.Response.Fetches.QueryPlan().PrettyPrint()

				if h.developmentMode {
					h.log.Sugar().Debugf("Query Plan:\n%s", printedPlan)
				} else {
					h.log.Debug("Query Plan", zap.String("query_plan", printedPlan))
				}
			}
		}
	}

	return nil
}