pkg/rules/trpc/trpc_otel_instrumenter.go (124 lines of code) (raw):
// Copyright (c) 2024 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trpc
import (
"fmt"
"os"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api-semconv/instrumenter/rpc"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api/instrumenter"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api/utils"
"github.com/alibaba/opentelemetry-go-auto-instrumentation/pkg/inst-api/version"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/trace"
"trpc.group/trpc-go/trpc-go/codec"
)
type trpcInnerEnabler struct {
enabled bool
}
func (t trpcInnerEnabler) Enable() bool {
return t.enabled
}
var trpcEnabler = trpcInnerEnabler{os.Getenv("OTEL_INSTRUMENTATION_TRPC_ENABLED") != "false"}
type trpcClientAttrsGetter struct {
}
func (t trpcClientAttrsGetter) GetSystem(request trpcReq) string {
return "trpc"
}
func (t trpcClientAttrsGetter) GetService(request trpcReq) string {
return request.msg.CallerService()
}
func (t trpcClientAttrsGetter) GetMethod(request trpcReq) string {
return request.msg.CallerMethod()
}
func (t trpcClientAttrsGetter) GetServerAddress(request trpcReq) string {
return request.addr
}
type trpcServerAttrsGetter struct {
}
func (t trpcServerAttrsGetter) GetSystem(request trpcReq) string {
return "trpc"
}
func (t trpcServerAttrsGetter) GetService(request trpcReq) string {
return request.msg.CalleeService()
}
func (t trpcServerAttrsGetter) GetMethod(request trpcReq) string {
return request.msg.CalleeMethod()
}
func (t trpcServerAttrsGetter) GetServerAddress(request trpcReq) string {
if request.msg.LocalAddr() != nil {
return request.msg.LocalAddr().String()
}
return ""
}
type trpcStatusCodeExtractor[REQUEST trpcReq, RESPONSE trpcRes] struct {
}
func (t trpcStatusCodeExtractor[REQUEST, RESPONSE]) Extract(span trace.Span, request trpcReq, response trpcRes, err error) {
statusCode := response.stausCode
if statusCode != 0 {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, fmt.Sprintf("trpc error status code %d", statusCode))
}
}
}
type trpcRequestCarrier struct {
reqHeader codec.Msg
}
func (t trpcRequestCarrier) Get(key string) string {
return string(t.reqHeader.ServerMetaData()[key])
}
func (t trpcRequestCarrier) Set(key string, value string) {
md := t.reqHeader.ClientMetaData()
if md == nil {
md = codec.MetaData{}
}
if _, ok := md[key]; ok {
return
}
md[key] = []byte(value)
t.reqHeader.WithClientMetaData(md)
}
func (t trpcRequestCarrier) Keys() []string {
vals := []string{}
for _, byteV := range t.reqHeader.ClientMetaData() {
vals = append(vals, string(byteV))
}
return vals
}
func BuildTrpcClientInstrumenter() instrumenter.Instrumenter[trpcReq, trpcRes] {
builder := instrumenter.Builder[trpcReq, trpcRes]{}
clientGetter := trpcClientAttrsGetter{}
return builder.Init().SetSpanStatusExtractor(&trpcStatusCodeExtractor[trpcReq, trpcRes]{}).SetSpanNameExtractor(&rpc.RpcSpanNameExtractor[trpcReq]{Getter: clientGetter}).
SetSpanKindExtractor(&instrumenter.AlwaysClientExtractor[trpcReq]{}).
AddAttributesExtractor(&rpc.ClientRpcAttrsExtractor[trpcReq, trpcRes, trpcClientAttrsGetter]{}).
SetInstrumentationScope(instrumentation.Scope{
Name: utils.TRPCGO_CLIENT_SCOPE_NAME,
Version: version.Tag,
}).
AddOperationListeners(rpc.RpcClientMetrics("trpc.client")).
BuildPropagatingToDownstreamInstrumenter(
func(n trpcReq) propagation.TextMapCarrier {
return trpcRequestCarrier{reqHeader: n.msg}
},
otel.GetTextMapPropagator(),
)
}
func BuildTrpcServerInstrumenter() instrumenter.Instrumenter[trpcReq, trpcRes] {
builder := instrumenter.Builder[trpcReq, trpcRes]{}
serverGetter := trpcServerAttrsGetter{}
return builder.Init().SetSpanStatusExtractor(&trpcStatusCodeExtractor[trpcReq, trpcRes]{}).SetSpanNameExtractor(&rpc.RpcSpanNameExtractor[trpcReq]{Getter: serverGetter}).
SetSpanKindExtractor(&instrumenter.AlwaysServerExtractor[trpcReq]{}).
AddAttributesExtractor(&rpc.ServerRpcAttrsExtractor[trpcReq, trpcRes, trpcServerAttrsGetter]{}).
SetInstrumentationScope(instrumentation.Scope{
Name: utils.TRPCGO_SERVER_SCOPE_NAME,
Version: version.Tag,
}).
AddOperationListeners(rpc.RpcServerMetrics("trpc.server")).
BuildPropagatingFromUpstreamInstrumenter(
func(n trpcReq) propagation.TextMapCarrier {
return trpcRequestCarrier{reqHeader: n.msg}
},
otel.GetTextMapPropagator(),
)
}