span_compressed.go (199 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apm // import "go.elastic.co/apm/v2"
import (
"sync/atomic"
"time"
"go.elastic.co/apm/v2/model"
)
const (
_ int = iota
compressedStrategyExactMatch
compressedStrategySameKind
)
const (
compressedSpanSameKindName = "Calls to "
)
type compositeSpan struct {
lastSiblingEndTime time.Time
// this internal representation should be set in Nanoseconds, although
// the model unit is set in Milliseconds.
sum time.Duration
count int
compressionStrategy int
}
func (cs compositeSpan) build() *model.CompositeSpan {
var out model.CompositeSpan
switch cs.compressionStrategy {
case compressedStrategyExactMatch:
out.CompressionStrategy = "exact_match"
case compressedStrategySameKind:
out.CompressionStrategy = "same_kind"
}
out.Count = cs.count
out.Sum = float64(cs.sum) / float64(time.Millisecond)
return &out
}
func (cs compositeSpan) empty() bool {
return cs.count < 1
}
// A span is eligible for compression if all the following conditions are met
// 1. It's an exit span
// 2. The trace context has not been propagated to a downstream service
// 3. If the span has outcome (i.e., outcome is present and it's not null) then
// it should be success. It means spans with outcome indicating an issue of
// potential interest should not be compressed.
//
// The second condition is important so that we don't remove (compress) a span
// that may be the parent of a downstream service. This would orphan the sub-
// graph started by the downstream service and cause it to not appear in the
// waterfall view.
func (s *Span) compress(sibling *Span) bool {
// If the spans aren't siblings, we cannot compress them.
if s.parentID != sibling.parentID {
return false
}
strategy := s.canCompressComposite(sibling)
if strategy == 0 {
strategy = s.canCompressStandard(sibling)
}
// If the span cannot be compressed using any strategy.
if strategy == 0 {
return false
}
if s.composite.empty() {
s.composite = compositeSpan{
count: 1,
sum: s.Duration,
compressionStrategy: strategy,
}
}
s.composite.count++
s.composite.sum += sibling.Duration
siblingTimestamp := sibling.timestamp.Add(sibling.Duration)
if siblingTimestamp.After(s.composite.lastSiblingEndTime) {
s.composite.lastSiblingEndTime = siblingTimestamp
}
return true
}
//
// Span //
//
// attemptCompress tries to compress a span into a "composite span" when:
// 1. Compression is enabled on agent.
// 2. The cached span and the incoming span share the same parent (are siblings).
// 3. The cached span and the incoming span are consecutive spans.
// 4. The cached span and the incoming span are both exit spans,
// outcome == success and are short enough (See
// `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` and
// `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` for more info).
// 5. The cached span and the incoming span represent the same exact operation
// or the same kind of operation:
// - Are an exact match (same name, kind and destination service).
// - Are the same kind match (same kind and destination service).
//
// When a span has already been compressed using a particular strategy, it
// CANNOT continue to compress spans using a different strategy.
//
// The compression algorithm is fairly simple and only compresses spans into a
// composite span when the conditions listed above are met for all consecutive
// spans, at any point any span that doesn't meet the conditions, will cause
// the cache be evicted and the cached span will be returned.
// * When the incoming span is compressible, it will replace the cached span.
// * When the incoming span is not compressible, it will be enqueued as well.
//
// Returns `true` when the span has been cached, thus the caller should not
// enqueue the span. When `false` is returned, the cache is evicted and the
// caller should enqueue the span.
//
// It needs to be called with s.mu, s.parent.mu, s.tx.TransactionData.mu and
// s.tx.mu.Rlock held.
func (s *Span) attemptCompress() (*Span, bool) {
// If the span has already been evicted from the cache, ask the caller to
// end it.
if !s.compressedSpan.options.enabled {
return nil, false
}
// When a parent span ends, flush its cache.
if cache := s.compressedSpan.evict(); cache != nil {
return cache, false
}
// There are two distinct places where the span can be cached; the parent
// span and the transaction. The algorithm prefers storing the cached spans
// in its parent, and if nil, it will use the transaction's cache.
if s.parent != nil {
if !s.parent.ended() {
return s.parent.compressedSpan.compressOrEvictCache(s)
}
return nil, false
}
if s.tx != nil {
if !s.tx.ended() {
return s.tx.compressedSpan.compressOrEvictCache(s)
}
}
return nil, false
}
func (s *Span) isCompressionEligible() bool {
if s == nil {
return false
}
ctxPropagated := atomic.LoadUint32(&s.ctxPropagated) == 1
return s.exit && !ctxPropagated &&
(s.Outcome == "" || s.Outcome == "success")
}
func (s *Span) canCompressStandard(sibling *Span) int {
if !s.isSameKind(sibling) {
return 0
}
// We've already established the spans are the same kind.
strategy := compressedStrategySameKind
maxDuration := s.compressedSpan.options.sameKindMaxDuration
// If it's an exact match, we then switch the settings
if s.isExactMatch(sibling) {
maxDuration = s.compressedSpan.options.exactMatchMaxDuration
strategy = compressedStrategyExactMatch
}
// Any spans that go over the maximum duration cannot be compressed.
if !s.durationLowerOrEq(sibling, maxDuration) {
return 0
}
// If the composite span already has a compression strategy it differs from
// the chosen strategy, the spans cannot be compressed.
if !s.composite.empty() && s.composite.compressionStrategy != strategy {
return 0
}
// Return whichever strategy was chosen.
return strategy
}
func (s *Span) canCompressComposite(sibling *Span) int {
if s.composite.empty() {
return 0
}
switch s.composite.compressionStrategy {
case compressedStrategyExactMatch:
if s.isExactMatch(sibling) && s.durationLowerOrEq(sibling,
s.compressedSpan.options.exactMatchMaxDuration,
) {
return compressedStrategyExactMatch
}
case compressedStrategySameKind:
if s.isSameKind(sibling) && s.durationLowerOrEq(sibling,
s.compressedSpan.options.sameKindMaxDuration,
) {
return compressedStrategySameKind
}
}
return 0
}
func (s *Span) durationLowerOrEq(sibling *Span, max time.Duration) bool {
return s.Duration <= max && sibling.Duration <= max
}
//
// SpanData //
//
// isExactMatch is used for compression purposes, two spans are considered an
// exact match if the have the same name and are of the same kind (see
// isSameKind for more details).
func (s *SpanData) isExactMatch(span *Span) bool {
return s.Name == span.Name && s.isSameKind(span)
}
// isSameKind is used for compression purposes, two spans are considered to be
// of the same kind if they have the same values for type, subtype, and
// `destination.service.resource`.
func (s *SpanData) isSameKind(span *Span) bool {
sameType := s.Type == span.Type
sameSubType := s.Subtype == span.Subtype
dstServiceTarget := s.Context.service.Target
otherDstServiceTarget := span.Context.service.Target
sameServiceTarget := dstServiceTarget != nil && otherDstServiceTarget != nil &&
dstServiceTarget.Type == otherDstServiceTarget.Type &&
dstServiceTarget.Name == otherDstServiceTarget.Name
return sameType && sameSubType && sameServiceTarget
}
// setCompressedSpanName changes the span name to "Calls to <destination service>"
// for composite spans that are compressed with the `"same_kind"` strategy.
func (s *SpanData) setCompressedSpanName() {
if s.composite.compressionStrategy != compressedStrategySameKind {
return
}
s.Name = s.getCompositeSpanName()
}
func (s *SpanData) getCompositeSpanName() string {
if s.Context.serviceTarget.Type == "" {
if s.Context.serviceTarget.Name == "" {
return compressedSpanSameKindName + "unknown"
} else {
return compressedSpanSameKindName + s.Context.serviceTarget.Name
}
} else if s.Context.serviceTarget.Name == "" {
return compressedSpanSameKindName + s.Context.serviceTarget.Type
} else {
return compressedSpanSameKindName + s.Context.serviceTarget.Type + "/" + s.Context.serviceTarget.Name
}
}
type compressedSpan struct {
cache *Span
options compressionOptions
}
// evict resets the cache to nil and returns the cached span after adjusting
// its Name, Duration, and timers.
//
// Should be only be called from Transaction.End() and Span.End().
func (cs *compressedSpan) evict() *Span {
if cs.cache == nil {
return nil
}
cached := cs.cache
cs.cache = nil
// When the span composite is not empty, we need to adjust the duration just
// before it is reported and no more spans will be compressed into the
// composite. If this is done before ending the span, the duration of the span
// could potentially grow over the compressable threshold and result in
// compressable span not being compressed and reported separately.
if !cached.composite.empty() {
cached.Duration = cached.composite.lastSiblingEndTime.Sub(cached.timestamp)
cached.setCompressedSpanName()
}
return cached
}
func (cs *compressedSpan) compressOrEvictCache(s *Span) (*Span, bool) {
if !s.isCompressionEligible() {
return cs.evict(), false
}
if cs.cache == nil {
cs.cache = s
return nil, true
}
var evictedSpan *Span
if cs.cache.compress(s) {
// Since span has been compressed into the composite, we decrease the
// s.tx.spansCreated since the span has been compressed into a composite.
if s.tx != nil {
if !s.tx.ended() {
s.tx.spansCreated--
}
}
} else {
evictedSpan = cs.evict()
cs.cache = s
}
return evictedSpan, true
}