pkg/rules/grpc/grpc_config.go (157 lines of code) (raw):

// Copyright (c) 2024 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package grpc import ( "context" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.19.0" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" ) type Filter func(*InterceptorInfo) bool // grpcOtelConfig is a group of options for this instrumentation. type grpcOtelConfig struct { Filter Filter Propagators propagation.TextMapPropagator TracerProvider trace.TracerProvider SpanStartOptions []trace.SpanStartOption ReceivedEvent bool SentEvent bool tracer trace.Tracer DestId string } // Option applies an option value for a grpcOtelConfig. type Option interface { apply(*grpcOtelConfig) } func (c *grpcOtelConfig) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool) { // nolint: revive // isServer is not a control flag. span := trace.SpanFromContext(ctx) if span == nil { return } var ( messageId int64 ) gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) switch rs := rs.(type) { case *stats.Begin: case *stats.InPayload: if c.ReceivedEvent { span.AddEvent("message", trace.WithAttributes( semconv.MessageTypeReceived, semconv.MessageIDKey.Int64(messageId), semconv.MessageUncompressedSizeKey.Int(rs.Length), ), ) } case *stats.OutPayload: if c.SentEvent { span.AddEvent("message", trace.WithAttributes( semconv.MessageTypeSent, semconv.MessageIDKey.Int64(messageId), //semconv.MessageCompressedSizeKey.Int(rs.CompressedLength), semconv.MessageUncompressedSizeKey.Int(rs.Length), ), ) } case *stats.OutTrailer: case *stats.End: if rs.Error != nil { s, _ := status.FromError(rs.Error) if isServer { grpcServerInstrument.End(ctx, grpcRequest{}, grpcResponse{ statusCode: int(s.Code()), }, rs.Error) } else { grpcClientInstrument.End(ctx, grpcRequest{}, grpcResponse{ statusCode: int(s.Code()), }, rs.Error) } } else { methodName := "" if gctx != nil { methodName = gctx.methodName } if isServer { grpcServerInstrument.End(ctx, grpcRequest{ methodName: methodName, }, grpcResponse{ statusCode: 200, }, nil) } else { grpcClientInstrument.End(ctx, grpcRequest{ methodName: methodName, }, grpcResponse{ statusCode: 200, }, nil) } } default: return } } // newConfig returns a grpcOtelConfig configured with all the passed Options. func newConfig(opts []Option, role string) *grpcOtelConfig { c := &grpcOtelConfig{ Propagators: otel.GetTextMapPropagator(), } for _, o := range opts { o.apply(c) } return c } type propagatorsOption struct{ p propagation.TextMapPropagator } func (o propagatorsOption) apply(c *grpcOtelConfig) { if o.p != nil { c.Propagators = o.p } } // WithPropagators returns an Option to use the Propagators when extracting // and injecting trace context from requests. func WithPropagators(p propagation.TextMapPropagator) Option { return propagatorsOption{p: p} } type tracerProviderOption struct{ tp trace.TracerProvider } func (o tracerProviderOption) apply(c *grpcOtelConfig) { if o.tp != nil { c.TracerProvider = o.tp } } // WithInterceptorFilter returns an Option to use the request filter. // // Deprecated: Use stats handlers instead. func WithInterceptorFilter(f Filter) Option { return interceptorFilterOption{f: f} } type interceptorFilterOption struct { f Filter } func (o interceptorFilterOption) apply(c *grpcOtelConfig) { if o.f != nil { c.Filter = o.f } } // WithTracerProvider returns an Option to use the TracerProvider when // creating a Tracer. func WithTracerProvider(tp trace.TracerProvider) Option { return tracerProviderOption{tp: tp} } //type meterProviderOption struct{ mp metric.MeterProvider } /*func (o meterProviderOption) apply(c *grpcOtelConfig) { if o.mp != nil { c.MeterProvider = o.mp } } // WithMeterProvider returns an Option to use the MeterProvider when // creating a Meter. If this option is not provide the global MeterProvider will be used. func WithMeterProvider(mp metric.MeterProvider) Option { return meterProviderOption{mp: mp} }*/ // Event type that can be recorded, see WithMessageEvents. type Event int // Different types of events that can be recorded, see WithMessageEvents. const ( ReceivedEvents Event = iota SentEvents ) type messageEventsProviderOption struct { events []Event } func (m messageEventsProviderOption) apply(c *grpcOtelConfig) { for _, e := range m.events { switch e { case ReceivedEvents: c.ReceivedEvent = true case SentEvents: c.SentEvent = true } } } // WithMessageEvents configures the Handler to record the specified events // (span.AddEvent) on spans. By default only summary attributes are added at the // end of the request. // // Valid events are: // - ReceivedEvents: Record the number of bytes read after every gRPC read operation. // - SentEvents: Record the number of bytes written after every gRPC write operation. func WithMessageEvents(events ...Event) Option { return messageEventsProviderOption{events: events} } type spanStartOption struct{ opts []trace.SpanStartOption } func (o spanStartOption) apply(c *grpcOtelConfig) { c.SpanStartOptions = append(c.SpanStartOptions, o.opts...) } // WithSpanOptions configures an additional set of // trace.SpanOptions, which are applied to each new span. func WithSpanOptions(opts ...trace.SpanStartOption) Option { return spanStartOption{opts} }