transaction.go (309 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apm // import "go.elastic.co/apm/v2"
import (
cryptorand "crypto/rand"
"encoding/binary"
"math/rand"
"sync"
"time"
)
const (
// maxDroppedSpanStats sets the hard limit for the number of dropped span
// stats that are stored in a transaction.
maxDroppedSpanStats = 128
)
// StartTransaction returns a new Transaction with the specified
// name and type, and with the start time set to the current time.
// This is equivalent to calling StartTransactionOptions with a
// zero TransactionOptions.
func (t *Tracer) StartTransaction(name, transactionType string) *Transaction {
return t.StartTransactionOptions(name, transactionType, TransactionOptions{})
}
// StartTransactionOptions returns a new Transaction with the
// specified name, type, and options.
func (t *Tracer) StartTransactionOptions(name, transactionType string, opts TransactionOptions) *Transaction {
td, _ := t.transactionDataPool.Get().(*TransactionData)
if td == nil {
td = &TransactionData{
Duration: -1,
Context: Context{
captureBodyMask: CaptureBodyTransactions,
},
spanTimings: make(spanTimingsMap),
droppedSpansStats: make(droppedSpanTimingsMap),
}
var seed int64
if err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed); err != nil {
seed = time.Now().UnixNano()
}
td.rand = rand.New(rand.NewSource(seed))
}
tx := &Transaction{tracer: t, TransactionData: td}
// Take a snapshot of config that should apply to all spans within the
// transaction.
instrumentationConfig := t.instrumentationConfig()
tx.recording = instrumentationConfig.recording
if !tx.recording || !t.Active() {
return tx
}
tx.maxSpans = instrumentationConfig.maxSpans
tx.compressedSpan.options = instrumentationConfig.compressionOptions
tx.exitSpanMinDuration = instrumentationConfig.exitSpanMinDuration
tx.spanStackTraceMinDuration = instrumentationConfig.spanStackTraceMinDuration
tx.stackTraceLimit = instrumentationConfig.stackTraceLimit
tx.Context.captureHeaders = instrumentationConfig.captureHeaders
tx.propagateLegacyHeader = instrumentationConfig.propagateLegacyHeader
tx.Context.sanitizedFieldNames = instrumentationConfig.sanitizedFieldNames
tx.breakdownMetricsEnabled = t.breakdownMetrics.enabled
continuationStrategy := instrumentationConfig.continuationStrategy
shouldRestartTrace := false
if continuationStrategy == "restart_external" {
if opts.TraceContext.State.haveElastic {
continuationStrategy = "continue"
} else {
continuationStrategy = "restart"
}
}
if continuationStrategy == "restart" {
if !opts.TraceContext.Trace.isZero() && !opts.TraceContext.Span.isZero() {
link := SpanLink{
Trace: opts.TraceContext.Trace,
Span: opts.TraceContext.Span,
}
tx.links = append(tx.links, link)
shouldRestartTrace = true
}
}
var root bool
if opts.TraceContext.Trace.Validate() == nil && !shouldRestartTrace {
tx.traceContext.Trace = opts.TraceContext.Trace
tx.traceContext.Options = opts.TraceContext.Options
if opts.TraceContext.Span.Validate() == nil {
tx.parentID = opts.TraceContext.Span
}
if opts.TransactionID.Validate() == nil {
tx.traceContext.Span = opts.TransactionID
} else {
binary.LittleEndian.PutUint64(tx.traceContext.Span[:], tx.rand.Uint64())
}
if opts.TraceContext.State.Validate() == nil {
tx.traceContext.State = opts.TraceContext.State
}
} else {
// Start a new trace. We reuse the trace ID for the root transaction's ID
// if one is not specified in the options.
root = true
binary.LittleEndian.PutUint64(tx.traceContext.Trace[:8], tx.rand.Uint64())
binary.LittleEndian.PutUint64(tx.traceContext.Trace[8:], tx.rand.Uint64())
if opts.TransactionID.Validate() == nil {
tx.traceContext.Span = opts.TransactionID
} else {
copy(tx.traceContext.Span[:], tx.traceContext.Trace[:])
}
}
if root {
var result SampleResult
if instrumentationConfig.sampler != nil {
result = instrumentationConfig.sampler.Sample(SampleParams{
TraceContext: tx.traceContext,
})
if !result.Sampled {
// Special case: for unsampled transactions we
// report a sample rate of 0, so that we do not
// count them in aggregations in the server.
// This is necessary to avoid overcounting, as
// we will scale the sampled transactions.
result.SampleRate = 0
}
sampleRate := roundSampleRate(result.SampleRate)
tx.traceContext.State = NewTraceState(TraceStateEntry{
Key: elasticTracestateVendorKey,
Value: formatElasticTracestateValue(sampleRate),
})
} else {
result.Sampled = true
}
if result.Sampled {
o := tx.traceContext.Options.WithRecorded(true)
tx.traceContext.Options = o
}
} else {
// TODO(axw) make this behaviour configurable. In some cases
// it may not be a good idea to honour the recorded flag, as
// it may open up the application to DoS by forced sampling.
// Even ignoring bad actors, a service that has many feeder
// applications may end up being sampled at a very high rate.
tx.traceContext.Options = opts.TraceContext.Options
}
tx.Name = name
tx.Type = transactionType
tx.timestamp = opts.Start
if tx.timestamp.IsZero() {
tx.timestamp = time.Now()
}
tx.links = append(tx.links, opts.Links...)
return tx
}
// TransactionOptions holds options for Tracer.StartTransactionOptions.
type TransactionOptions struct {
// TraceContext holds the TraceContext for a new transaction. If this is
// zero, a new trace will be started.
TraceContext TraceContext
// TransactionID holds the ID to assign to the transaction. If this is
// zero, a new ID will be generated and used instead.
TransactionID SpanID
// Start is the start time of the transaction. If this has the
// zero value, time.Now() will be used instead.
Start time.Time
// Links, if non-nil, holds a list of spans linked to the transaction.
Links []SpanLink
}
// Transaction describes an event occurring in the monitored service.
type Transaction struct {
tracer *Tracer
traceContext TraceContext
parentID SpanID
mu sync.RWMutex
// TransactionData holds the transaction data. This field is set to
// nil when either of the transaction's End or Discard methods are called.
*TransactionData
}
// Sampled reports whether or not the transaction is sampled.
func (tx *Transaction) Sampled() bool {
if tx == nil {
return false
}
return tx.traceContext.Options.Recorded()
}
// TraceContext returns the transaction's TraceContext.
//
// The resulting TraceContext's Span field holds the transaction's ID.
// If tx is nil, a zero (invalid) TraceContext is returned.
func (tx *Transaction) TraceContext() TraceContext {
if tx == nil {
return TraceContext{}
}
return tx.traceContext
}
// ShouldPropagateLegacyHeader reports whether instrumentation should
// propagate the legacy "Elastic-Apm-Traceparent" header in addition to
// the standard W3C "traceparent" header.
//
// This method will be removed in a future major version when we remove
// support for propagating the legacy header.
func (tx *Transaction) ShouldPropagateLegacyHeader() bool {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.ended() {
return false
}
return tx.propagateLegacyHeader
}
// EnsureParent returns the span ID for for tx's parent, generating a
// parent span ID if one has not already been set and tx has not been
// ended. If tx is nil or has been ended, a zero (invalid) SpanID is
// returned.
//
// This method can be used for generating a span ID for the RUM
// (Real User Monitoring) agent, where the RUM agent is initialized
// after the backend service returns.
func (tx *Transaction) EnsureParent() SpanID {
if tx == nil {
return SpanID{}
}
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.ended() {
return SpanID{}
}
if tx.parentID.isZero() {
// parentID can only be zero if tx is a root transaction
// for which GenerateParentTraceContext() has not previously
// been called. Reuse the latter half of the trace ID for
// the parent span ID; the first half is used for the
// transaction ID.
copy(tx.parentID[:], tx.traceContext.Trace[8:])
}
return tx.parentID
}
// AddLink adds a link.
func (tx *Transaction) AddLink(l SpanLink) {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.ended() {
return
}
tx.links = append(tx.links, l)
}
// ParentID returns the ID of the transaction's Parent or a zero (invalid) SpanID.
func (tx *Transaction) ParentID() SpanID {
if tx == nil {
return SpanID{}
}
tx.mu.RLock()
defer tx.mu.RUnlock()
return tx.parentID
}
// Discard discards a previously started transaction.
//
// Calling Discard will set tx's TransactionData field to nil, so callers must
// ensure tx is not updated after Discard returns.
func (tx *Transaction) Discard() {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.ended() {
return
}
tx.reset(tx.tracer)
tx.TransactionData = nil
}
// End enqueues tx for sending to the Elastic APM server.
//
// Calling End will set tx's TransactionData field to nil, so callers
// must ensure tx is not updated after End returns.
//
// If tx.Duration has not been set, End will set it to the elapsed time
// since the transaction's start time.
func (tx *Transaction) End() {
tx.mu.Lock()
defer tx.mu.Unlock()
if tx.ended() {
return
}
if tx.Type == "" {
tx.Type = "custom"
}
if tx.recording {
if tx.Duration < 0 {
tx.Duration = time.Since(tx.timestamp)
}
if tx.Outcome == "" {
tx.Outcome = tx.Context.outcome()
if tx.Outcome == "" {
if tx.errorCaptured {
tx.Outcome = "failure"
} else {
tx.Outcome = "success"
}
}
}
// Hold the transaction data lock to check if the transaction has any
// compressed spans in its cache, if so, evict cache and end the span.
tx.TransactionData.mu.Lock()
if evictedSpan := tx.compressedSpan.evict(); evictedSpan != nil {
evictedSpan.end()
}
tx.TransactionData.mu.Unlock()
tx.enqueue()
} else {
tx.reset(tx.tracer)
}
tx.TransactionData = nil
}
func (tx *Transaction) enqueue() {
event := tracerEvent{eventType: transactionEvent}
event.tx.Transaction = tx
event.tx.TransactionData = tx.TransactionData
select {
case tx.tracer.events <- event:
default:
// Enqueuing a transaction should never block.
tx.tracer.breakdownMetrics.recordTransaction(tx.TransactionData)
tx.tracer.stats.accumulate(TracerStats{TransactionsDropped: 1})
tx.reset(tx.tracer)
}
}
// ended reports whether or not End or Discard has been called.
//
// This must be called with tx.mu held.
func (tx *Transaction) ended() bool {
return tx.TransactionData == nil
}
// TransactionData holds the details for a transaction, and is embedded
// inside Transaction. When a transaction is ended, its TransactionData
// field will be set to nil.
type TransactionData struct {
// Name holds the transaction name, initialized with the value
// passed to StartTransaction.
Name string
// Type holds the transaction type, initialized with the value
// passed to StartTransaction.
Type string
// Duration holds the transaction duration, initialized to -1.
//
// If you do not update Duration, calling Transaction.End will
// calculate the duration based on the elapsed time since the
// transaction's start time.
Duration time.Duration
// Context describes the context in which the transaction occurs.
Context Context
// Result holds the transaction result.
Result string
// Outcome holds the transaction outcome: success, failure, or
// unknown (the default). If Outcome is set to something else,
// it will be replaced with "unknown".
//
// Outcome is used for error rate calculations. A value of "success"
// indicates that a transaction succeeded, while "failure" indicates
// that the transaction failed. If Outcome is set to "unknown" (or
// some other value), then the transaction will not be included in
// error rate calculations.
Outcome string
recording bool
maxSpans int
exitSpanMinDuration time.Duration
spanStackTraceMinDuration time.Duration
stackTraceLimit int
breakdownMetricsEnabled bool
propagateLegacyHeader bool
timestamp time.Time
links []SpanLink
mu sync.Mutex
errorCaptured bool
spansCreated int
spansDropped int
childrenTimer childrenTimer
spanTimings spanTimingsMap
droppedSpansStats droppedSpanTimingsMap
rand *rand.Rand // for ID generation
compressedSpan compressedSpan
}
// reset resets the TransactionData back to its zero state and places it back
// into the transaction pool.
func (td *TransactionData) reset(tracer *Tracer) {
*td = TransactionData{
Context: td.Context,
Duration: -1,
rand: td.rand,
spanTimings: td.spanTimings,
droppedSpansStats: td.droppedSpansStats,
}
td.Context.reset()
td.spanTimings.reset()
td.droppedSpansStats.reset()
tracer.transactionDataPool.Put(td)
}
type droppedSpanTimingsKey struct {
serviceTargetType string
serviceTargetName string
destination string
outcome string
}
// droppedSpanTimingsMap records span timings for groups of dropped spans.
type droppedSpanTimingsMap map[droppedSpanTimingsKey]spanTiming
// add accumulates the timing for a {destination, outcome} pair, silently drops
// any pairs that would cause the map to exceed the maxDroppedSpanStats.
func (m droppedSpanTimingsMap) add(targetType, targetName, dst, outcome string, count int, d time.Duration) {
k := droppedSpanTimingsKey{
serviceTargetType: targetType,
serviceTargetName: targetName,
destination: dst,
outcome: outcome,
}
timing, ok := m[k]
if ok || maxDroppedSpanStats > len(m) {
timing.count += uint64(count)
timing.duration += int64(d)
m[k] = timing
}
}
// reset resets m back to its initial zero state.
func (m droppedSpanTimingsMap) reset() {
for k := range m {
delete(m, k)
}
}