in router/core/engine_loader_hooks.go [113:272]
func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourceInfo, responseInfo *resolve.ResponseInfo) {
if resolve.IsIntrospectionDataSource(ds.ID) {
return
}
reqContext := getRequestContext(ctx)
if reqContext == nil {
return
}
hookCtx, ok := ctx.Value(rcontext.EngineLoaderHooksContextKey).(*engineLoaderHooksRequestContext)
if !ok {
return
}
latency := time.Since(hookCtx.startTime)
span := trace.SpanFromContext(ctx)
defer span.End()
if responseInfo == nil {
responseInfo = &resolve.ResponseInfo{}
}
commonAttrs := []attribute.KeyValue{
semconv.HTTPStatusCode(responseInfo.StatusCode),
rotel.WgSubgraphID.String(ds.ID),
rotel.WgSubgraphName.String(ds.Name),
}
traceAttrs := *reqContext.telemetry.AcquireAttributes()
defer reqContext.telemetry.ReleaseAttributes(&traceAttrs)
traceAttrs = append(traceAttrs, reqContext.telemetry.traceAttrs...)
traceAttrs = append(traceAttrs, rotel.WgComponentName.String("engine-loader"))
traceAttrs = append(traceAttrs, commonAttrs...)
exprCtx := reqContext.expressionContext.Clone()
exprCtx.Subgraph.Id = ds.ID
exprCtx.Subgraph.Name = ds.Name
exprCtx.Subgraph.Request.Error = WrapExprError(responseInfo.Err)
if f.storeSubgraphResponseBody {
exprCtx.Subgraph.Response.Body.Raw = responseInfo.GetResponseBody()
}
metricAttrs := *reqContext.telemetry.AcquireAttributes()
defer reqContext.telemetry.ReleaseAttributes(&metricAttrs)
metricAttrs = append(metricAttrs, reqContext.telemetry.metricAttrs...)
metricAttrs = append(metricAttrs, commonAttrs...)
if f.telemetryAttributeExpressions != nil {
telemetryValues, err := f.telemetryAttributeExpressions.expressionsAttributesWithSubgraph(exprCtx)
if err != nil {
reqContext.Logger().Warn("failed to resolve expression for telemetry", zap.Error(err))
}
traceAttrs = append(traceAttrs, telemetryValues...)
metricAttrs = append(metricAttrs, telemetryValues...)
}
if f.tracingAttributeExpressions != nil {
tracingValues, err := f.tracingAttributeExpressions.expressionsAttributesWithSubgraph(exprCtx)
if err != nil {
reqContext.Logger().Warn("failed to resolve expression for tracing", zap.Error(err))
}
traceAttrs = append(traceAttrs, tracingValues...)
}
if f.metricAttributeExpressions != nil {
metricValues, err := f.metricAttributeExpressions.expressionsAttributesWithSubgraph(exprCtx)
if err != nil {
reqContext.Logger().Warn("failed to resolve expression for metrics", zap.Error(err))
}
metricAttrs = append(metricAttrs, metricValues...)
}
metricAddOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...))
if f.accessLogger != nil {
fields := []zap.Field{
zap.String("subgraph_name", ds.Name),
zap.String("subgraph_id", ds.ID),
zap.Int("status", responseInfo.StatusCode),
zap.Duration("latency", latency),
}
path := ds.Name
if responseInfo.Request != nil {
fields = append(fields, f.accessLogger.RequestFields(responseInfo, exprCtx)...)
if responseInfo.Request.URL != nil {
path = responseInfo.Request.URL.Path
}
}
f.accessLogger.Info(path, fields)
}
if responseInfo.Err != nil {
// Set error status. This is the fetch error from the engine
// Downstream errors are extracted from the subgraph response
span.SetStatus(codes.Error, responseInfo.Err.Error())
span.RecordError(responseInfo.Err)
var errorCodesAttr []string
if unwrapped, ok := responseInfo.Err.(multiError); ok {
errs := unwrapped.Unwrap()
for _, e := range errs {
var subgraphError *resolve.SubgraphError
if errors.As(e, &subgraphError) {
for i, downstreamError := range subgraphError.DownstreamErrors {
var errorCode string
if downstreamError.Extensions != nil {
if ok := downstreamError.Extensions["code"]; ok != nil {
if code, ok := downstreamError.Extensions["code"].(string); ok {
errorCode = code
}
}
}
if errorCode != "" {
errorCodesAttr = append(errorCodesAttr, errorCode)
span.AddEvent(fmt.Sprintf("Downstream error %d", i+1),
trace.WithAttributes(
rotel.WgSubgraphErrorExtendedCode.String(errorCode),
rotel.WgSubgraphErrorMessage.String(downstreamError.Message),
),
)
}
}
}
}
}
errorCodesAttr = unique.SliceElements(errorCodesAttr)
// Reduce cardinality of error codes
slices.Sort(errorCodesAttr)
metricSliceAttrs := *reqContext.telemetry.AcquireAttributes()
defer reqContext.telemetry.ReleaseAttributes(&metricSliceAttrs)
metricSliceAttrs = append(metricSliceAttrs, reqContext.telemetry.metricSliceAttrs...)
// We can't add this earlier because this is done per subgraph response
if v, ok := reqContext.telemetry.metricSetAttrs[ContextFieldGraphQLErrorCodes]; ok && len(errorCodesAttr) > 0 {
metricSliceAttrs = append(metricSliceAttrs, attribute.StringSlice(v, errorCodesAttr))
}
f.metricStore.MeasureRequestError(ctx, metricSliceAttrs, metricAddOpt)
metricAttrs = append(metricAttrs, rotel.WgRequestError.Bool(true))
attrOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...))
f.metricStore.MeasureRequestCount(ctx, metricSliceAttrs, attrOpt)
f.metricStore.MeasureLatency(ctx, latency, metricSliceAttrs, attrOpt)
} else {
f.metricStore.MeasureRequestCount(ctx, reqContext.telemetry.metricSliceAttrs, metricAddOpt)
f.metricStore.MeasureLatency(ctx, latency, reqContext.telemetry.metricSliceAttrs, metricAddOpt)
}
span.SetAttributes(traceAttrs...)
}