pkg/inst-api/instrumenter/instrumenter.go (208 lines of code) (raw):

// Copyright (c) 2024 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package instrumenter import ( "context" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "sync" "time" ) type Instrumenter[REQUEST any, RESPONSE any] interface { ShouldStart(parentContext context.Context, request REQUEST) bool StartAndEnd(parentContext context.Context, request REQUEST, response RESPONSE, err error, startTime, endTime time.Time) StartAndEndWithOptions(parentContext context.Context, request REQUEST, response RESPONSE, err error, startTime, endTime time.Time, startOptions []trace.SpanStartOption, endOptions []trace.SpanEndOption) Start(parentContext context.Context, request REQUEST, options ...trace.SpanStartOption) context.Context End(ctx context.Context, request REQUEST, response RESPONSE, err error, options ...trace.SpanEndOption) } type InternalInstrumenter[REQUEST any, RESPONSE any] struct { enabler InstrumentEnabler spanNameExtractor SpanNameExtractor[REQUEST] spanKindExtractor SpanKindExtractor[REQUEST] spanStatusExtractor SpanStatusExtractor[REQUEST, RESPONSE] attributesExtractors []AttributesExtractor[REQUEST, RESPONSE] operationListeners []OperationListener contextCustomizers []ContextCustomizer[REQUEST] spanSuppressor SpanSuppressor tracer trace.Tracer instVersion string } type PropagatingToDownstreamInstrumenter[REQUEST any, RESPONSE any] struct { carrierGetter func(REQUEST) propagation.TextMapCarrier prop propagation.TextMapPropagator base InternalInstrumenter[REQUEST, RESPONSE] } type PropagatingFromUpstreamInstrumenter[REQUEST any, RESPONSE any] struct { carrierGetter func(REQUEST) propagation.TextMapCarrier prop propagation.TextMapPropagator base InternalInstrumenter[REQUEST, RESPONSE] } func (i *InternalInstrumenter[REQUEST, RESPONSE]) ShouldStart(parentContext context.Context, request REQUEST) bool { spanKind := i.spanKindExtractor.Extract(request) suppressed := i.spanSuppressor.ShouldSuppress(parentContext, spanKind) // TODO: record suppressed span return !suppressed } var cachePool = &sync.Pool{ New: func() interface{} { return make([]attribute.KeyValue, 0, 25) }, } func GetCachedAttrs() []attribute.KeyValue { return cachePool.Get().([]attribute.KeyValue) } func PutCachedAttrs(attrs []attribute.KeyValue) { attrs = attrs[:0] cachePool.Put(attrs) } func (i *InternalInstrumenter[REQUEST, RESPONSE]) StartAndEnd(parentContext context.Context, request REQUEST, response RESPONSE, err error, startTime, endTime time.Time) { ctx := i.doStart(parentContext, request, startTime) i.doEnd(ctx, request, response, err, endTime) } func (i *InternalInstrumenter[REQUEST, RESPONSE]) StartAndEndWithOptions(parentContext context.Context, request REQUEST, response RESPONSE, err error, startTime, endTime time.Time, startOptions []trace.SpanStartOption, endOptions []trace.SpanEndOption) { ctx := i.doStart(parentContext, request, startTime, startOptions...) i.doEnd(ctx, request, response, err, endTime, endOptions...) } func (i *InternalInstrumenter[REQUEST, RESPONSE]) Start(parentContext context.Context, request REQUEST, options ...trace.SpanStartOption) context.Context { return i.doStart(parentContext, request, time.Now(), options...) } func (i *InternalInstrumenter[REQUEST, RESPONSE]) doStart(parentContext context.Context, request REQUEST, timestamp time.Time, options ...trace.SpanStartOption) context.Context { if i.enabler != nil && !i.enabler.Enable() { return parentContext } for _, listener := range i.operationListeners { parentContext = listener.OnBeforeStart(parentContext, timestamp) } // extract span name spanName := i.spanNameExtractor.Extract(request) spanKind := i.spanKindExtractor.Extract(request) options = append(options, trace.WithSpanKind(spanKind), trace.WithTimestamp(timestamp)) newCtx, span := i.tracer.Start(parentContext, spanName, options...) attrs := make([]attribute.KeyValue, 0, 20) // extract span attrs for _, extractor := range i.attributesExtractors { attrs, newCtx = extractor.OnStart(attrs, newCtx, request) } // execute context customizer hook for _, customizer := range i.contextCustomizers { newCtx = customizer.OnStart(newCtx, request, attrs) } for _, listener := range i.operationListeners { newCtx = listener.OnBeforeEnd(newCtx, attrs, timestamp) } span.SetAttributes(attrs...) return i.spanSuppressor.StoreInContext(newCtx, spanKind, span) } func (i *InternalInstrumenter[REQUEST, RESPONSE]) End(ctx context.Context, request REQUEST, response RESPONSE, err error, options ...trace.SpanEndOption) { i.doEnd(ctx, request, response, err, time.Now(), options...) } func (i *InternalInstrumenter[REQUEST, RESPONSE]) doEnd(ctx context.Context, request REQUEST, response RESPONSE, err error, timestamp time.Time, options ...trace.SpanEndOption) { if i.enabler != nil && !i.enabler.Enable() { return } for _, listener := range i.operationListeners { listener.OnAfterStart(ctx, timestamp) } span := trace.SpanFromContext(ctx) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } attrs := GetCachedAttrs() defer PutCachedAttrs(attrs) // extract span attributes for _, extractor := range i.attributesExtractors { attrs, ctx = extractor.OnEnd(attrs, ctx, request, response, err) } i.spanStatusExtractor.Extract(span, request, response, err) span.SetAttributes(attrs...) options = append(options, trace.WithTimestamp(timestamp)) span.End(options...) for _, listener := range i.operationListeners { listener.OnAfterEnd(ctx, attrs, timestamp) } } func (p *PropagatingToDownstreamInstrumenter[REQUEST, RESPONSE]) ShouldStart(parentContext context.Context, request REQUEST) bool { return p.base.ShouldStart(parentContext, request) } func (p *PropagatingToDownstreamInstrumenter[REQUEST, RESPONSE]) StartAndEnd(parentContext context.Context, request REQUEST, response RESPONSE, err error, startTime, endTime time.Time) { newCtx := p.base.doStart(parentContext, request, startTime) if p.carrierGetter != nil { if p.prop != nil { p.prop.Inject(newCtx, p.carrierGetter(request)) } else { otel.GetTextMapPropagator().Inject(newCtx, p.carrierGetter(request)) } } p.base.doEnd(newCtx, request, response, err, endTime) } func (p *PropagatingToDownstreamInstrumenter[REQUEST, RESPONSE]) StartAndEndWithOptions(parentContext context.Context, request REQUEST, response RESPONSE, err error, startTime, endTime time.Time, startOptions []trace.SpanStartOption, endOptions []trace.SpanEndOption) { newCtx := p.base.doStart(parentContext, request, startTime, startOptions...) if p.carrierGetter != nil { if p.prop != nil { p.prop.Inject(newCtx, p.carrierGetter(request)) } else { otel.GetTextMapPropagator().Inject(newCtx, p.carrierGetter(request)) } } p.base.doEnd(newCtx, request, response, err, endTime, endOptions...) } func (p *PropagatingToDownstreamInstrumenter[REQUEST, RESPONSE]) Start(parentContext context.Context, request REQUEST, options ...trace.SpanStartOption) context.Context { newCtx := p.base.Start(parentContext, request, options...) if p.carrierGetter != nil { if p.prop != nil { p.prop.Inject(newCtx, p.carrierGetter(request)) } else { otel.GetTextMapPropagator().Inject(newCtx, p.carrierGetter(request)) } } return newCtx } func (p *PropagatingToDownstreamInstrumenter[REQUEST, RESPONSE]) End(ctx context.Context, request REQUEST, response RESPONSE, err error, options ...trace.SpanEndOption) { p.base.End(ctx, request, response, err, options...) } func (p *PropagatingFromUpstreamInstrumenter[REQUEST, RESPONSE]) ShouldStart(parentContext context.Context, request REQUEST) bool { return p.base.ShouldStart(parentContext, request) } func (p *PropagatingFromUpstreamInstrumenter[REQUEST, RESPONSE]) StartAndEnd(parentContext context.Context, request REQUEST, response RESPONSE, err error, startTime, endTime time.Time) { var ctx context.Context if p.carrierGetter != nil { var extracted context.Context if p.prop != nil { extracted = p.prop.Extract(parentContext, p.carrierGetter(request)) } else { extracted = otel.GetTextMapPropagator().Extract(parentContext, p.carrierGetter(request)) } ctx = p.base.doStart(extracted, request, startTime) } else { ctx = parentContext } p.base.doEnd(ctx, request, response, err, endTime) } func (p *PropagatingFromUpstreamInstrumenter[REQUEST, RESPONSE]) StartAndEndWithOptions(parentContext context.Context, request REQUEST, response RESPONSE, err error, startTime, endTime time.Time, startOptions []trace.SpanStartOption, endOptions []trace.SpanEndOption) { var ctx context.Context if p.carrierGetter != nil { var extracted context.Context if p.prop != nil { extracted = p.prop.Extract(parentContext, p.carrierGetter(request)) } else { extracted = otel.GetTextMapPropagator().Extract(parentContext, p.carrierGetter(request)) } ctx = p.base.doStart(extracted, request, startTime, startOptions...) } else { ctx = parentContext } p.base.doEnd(ctx, request, response, err, endTime, endOptions...) } func (p *PropagatingFromUpstreamInstrumenter[REQUEST, RESPONSE]) Start(parentContext context.Context, request REQUEST, options ...trace.SpanStartOption) context.Context { if p.carrierGetter != nil { var extracted context.Context if p.prop != nil { extracted = p.prop.Extract(parentContext, p.carrierGetter(request)) } else { extracted = otel.GetTextMapPropagator().Extract(parentContext, p.carrierGetter(request)) } return p.base.Start(extracted, request, options...) } else { return parentContext } } func (p *PropagatingFromUpstreamInstrumenter[REQUEST, RESPONSE]) End(ctx context.Context, request REQUEST, response RESPONSE, err error, options ...trace.SpanEndOption) { p.base.End(ctx, request, response, err, options...) }