processor/spanprocessor/span.go (166 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package spanprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor"
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
)
type spanProcessor struct {
config Config
toAttributeRules []toAttributeRule
skipExpr expr.BoolExpr[ottlspan.TransformContext]
}
// toAttributeRule is the compiled equivalent of config.ToAttributes field.
type toAttributeRule struct {
// Compiled regexp.
re *regexp.Regexp
// Attribute names extracted from the regexp's subexpressions.
attrNames []string
}
// newSpanProcessor returns the span processor.
func newSpanProcessor(config Config) (*spanProcessor, error) {
skipExpr, err := filterspan.NewSkipExpr(&config.MatchConfig)
if err != nil {
return nil, err
}
sp := &spanProcessor{
config: config,
skipExpr: skipExpr,
}
// Compile ToAttributes regexp and extract attributes names.
if config.Rename.ToAttributes != nil {
for _, pattern := range config.Rename.ToAttributes.Rules {
re, err := regexp.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("invalid regexp pattern %s", pattern)
}
rule := toAttributeRule{
re: re,
// Subexpression names will become attribute names during extraction.
attrNames: re.SubexpNames(),
}
sp.toAttributeRules = append(sp.toAttributeRules, rule)
}
}
return sp, nil
}
func (sp *spanProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
ilss := rs.ScopeSpans()
resource := rs.Resource()
for j := 0; j < ilss.Len(); j++ {
ils := ilss.At(j)
spans := ils.Spans()
scope := ils.Scope()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if sp.skipExpr != nil {
skip, err := sp.skipExpr.Eval(ctx, ottlspan.NewTransformContext(span, scope, resource, ils, rs))
if err != nil {
return td, err
}
if skip {
continue
}
}
sp.processFromAttributes(span)
sp.processToAttributes(span)
sp.processUpdateStatus(span)
}
}
}
return td, nil
}
func (sp *spanProcessor) processFromAttributes(span ptrace.Span) {
if len(sp.config.Rename.FromAttributes) == 0 {
// There is FromAttributes rule.
return
}
attrs := span.Attributes()
if attrs.Len() == 0 {
// There are no attributes to create span name from.
return
}
// Note: There was a separate proposal for creating the string.
// With benchmarking, strings.Builder is faster than the proposal.
// For full context, refer to this PR comment:
// https://go.opentelemetry.io/collector/pull/301#discussion_r318357678
var sb strings.Builder
for i, key := range sp.config.Rename.FromAttributes {
attr, found := attrs.Get(key)
// If one of the keys isn't found, the span name is not updated.
if !found {
return
}
// Note: WriteString() always return a nil error so there is no error checking
// for this method call.
// https://golang.org/src/strings/builder.go?s=3425:3477#L110
// Include the separator before appending an attribute value if:
// this isn't the first value(ie i == 0) loop through the FromAttributes
// and
// the separator isn't an empty string.
if i > 0 && sp.config.Rename.Separator != "" {
sb.WriteString(sp.config.Rename.Separator)
}
switch attr.Type() {
case pcommon.ValueTypeStr:
sb.WriteString(attr.Str())
case pcommon.ValueTypeBool:
sb.WriteString(strconv.FormatBool(attr.Bool()))
case pcommon.ValueTypeDouble:
sb.WriteString(strconv.FormatFloat(attr.Double(), 'f', -1, 64))
case pcommon.ValueTypeInt:
sb.WriteString(strconv.FormatInt(attr.Int(), 10))
case pcommon.ValueTypeEmpty:
fallthrough
case pcommon.ValueTypeSlice:
fallthrough
case pcommon.ValueTypeBytes:
fallthrough
case pcommon.ValueTypeMap:
fallthrough
default:
sb.WriteString("<unknown-attribute-type>")
}
}
span.SetName(sb.String())
}
func (sp *spanProcessor) processToAttributes(span ptrace.Span) {
if span.Name() == "" {
// There is no span name to work on.
return
}
if sp.config.Rename.ToAttributes == nil {
// No rules to apply.
return
}
// Process rules one by one. Store results of processing in the span
// so that each subsequent rule works on the span name that is the output
// after processing the previous rule.
for _, rule := range sp.toAttributeRules {
re := rule.re
oldName := span.Name()
// Match the regular expression and extract matched subexpressions.
submatches := re.FindStringSubmatch(oldName)
if submatches == nil {
continue
}
// There is a match. We will also need positions of subexpression matches.
submatchIdxPairs := re.FindStringSubmatchIndex(oldName)
// A place to accumulate new span name.
var sb strings.Builder
// Index in the oldName until which we traversed.
oldNameIndex := 0
attrs := span.Attributes()
// TODO: Pre-allocate len(submatches) space in the attributes.
// Start from index 1, which is the first submatch (index 0 is the entire match).
// We will go over submatches and will simultaneously build a new span name,
// replacing matched subexpressions by attribute names.
for i := 1; i < len(submatches); i++ {
attrs.PutStr(rule.attrNames[i], submatches[i])
// Add part of span name from end of previous match to start of this match
// and then add attribute name wrapped in curly brackets.
matchStartIndex := submatchIdxPairs[i*2] // start of i'th submatch.
sb.WriteString(oldName[oldNameIndex:matchStartIndex] + "{" + rule.attrNames[i] + "}")
// Advance the index to the end of current match.
oldNameIndex = submatchIdxPairs[i*2+1] // end of i'th submatch.
}
if oldNameIndex < len(oldName) {
// Append the remainder, from the end of last match until end of span name.
sb.WriteString(oldName[oldNameIndex:])
}
// Set new span name.
if !sp.config.Rename.ToAttributes.KeepOriginalName {
span.SetName(sb.String())
}
if sp.config.Rename.ToAttributes.BreakAfterMatch {
// Stop processing, break after first match is requested.
break
}
}
}
func (sp *spanProcessor) processUpdateStatus(span ptrace.Span) {
cfg := sp.config.SetStatus
if cfg != nil {
switch cfg.Code {
case statusCodeOk:
span.Status().SetCode(ptrace.StatusCodeOk)
span.Status().SetMessage("")
case statusCodeError:
span.Status().SetCode(ptrace.StatusCodeError)
span.Status().SetMessage(cfg.Description)
case statusCodeUnset:
span.Status().SetCode(ptrace.StatusCodeUnset)
span.Status().SetMessage("")
}
}
}