tracer.go (1,127 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apm // import "go.elastic.co/apm/v2"
import (
"bytes"
"compress/zlib"
"context"
"io"
"log"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"
"go.elastic.co/apm/v2/apmconfig"
"go.elastic.co/apm/v2/internal/apmlog"
"go.elastic.co/apm/v2/internal/configutil"
"go.elastic.co/apm/v2/internal/iochan"
"go.elastic.co/apm/v2/internal/ringbuffer"
"go.elastic.co/apm/v2/internal/wildcard"
"go.elastic.co/apm/v2/model"
"go.elastic.co/apm/v2/transport"
"go.elastic.co/fastjson"
)
const (
gracePeriodJitter = 0.1 // +/- 10%
tracerEventChannelCap = 1000
)
var (
tracerMu sync.RWMutex
defaultTracer *Tracer
)
// DefaultTracer returns the default global Tracer, set the first time the
// function is called, or after calling SetDefaultTracer(nil).
//
// The default tracer is configured via environment variables, and if the those are invalid
// the tracer will be disabled.
func DefaultTracer() *Tracer {
tracerMu.RLock()
if defaultTracer != nil {
tracer := defaultTracer
tracerMu.RUnlock()
return tracer
}
tracerMu.RUnlock()
tracerMu.Lock()
defer tracerMu.Unlock()
if defaultTracer != nil {
return defaultTracer
}
var opts TracerOptions
opts.initDefaults(true)
defaultTracer = newTracer(opts)
return defaultTracer
}
// SetDefaultTracer sets the tracer returned by DefaultTracer.
//
// If a default tracer has already been initialized, it is closed.
// Any queued events are not flushed; it is the responsibility of the
// caller to call the default tracer's Flush method first, if needed.
//
// Calling SetDefaultTracer(nil) will clear the default tracer,
// causing DefaultTracer to initialize a new default tracer.
func SetDefaultTracer(t *Tracer) {
tracerMu.Lock()
defer tracerMu.Unlock()
if defaultTracer != nil {
defaultTracer.Close()
}
defaultTracer = t
}
// TracerOptions holds initial tracer options, for passing to NewTracerOptions.
type TracerOptions struct {
// ServiceName holds the service name.
//
// If ServiceName is empty, the service name will be defined using the
// ELASTIC_APM_SERVICE_NAME environment variable, or if that is not set,
// the executable name.
ServiceName string
// ServiceVersion holds the service version.
//
// If ServiceVersion is empty, the service version will be defined using
// the ELASTIC_APM_SERVICE_VERSION environment variable.
ServiceVersion string
// ServiceEnvironment holds the service environment.
//
// If ServiceEnvironment is empty, the service environment will be defined
// using the ELASTIC_APM_ENVIRONMENT environment variable.
ServiceEnvironment string
// Transport holds the transport to use for sending events.
//
// If Transport is nil, a new HTTP transport will be created from environment
// variables.
//
// If Transport implements apmconfig.Watcher, the tracer will begin watching
// for remote changes immediately. This behaviour can be disabled by setting
// the environment variable ELASTIC_APM_CENTRAL_CONFIG=false.
// If Transport implements the interface below, the tracer will query the
// APM Server "/" endpoint to obtain the remote major version. Implementers
// of this interface must cache the remote server version and only refresh
// on subsequent calls that have `refreshStale` set to true. Implementations
// must be concurrently safe.
// MajorServerVersion(ctx context.Context, refreshStale bool) uint32
Transport transport.Transport
requestDuration time.Duration
metricsInterval time.Duration
maxSpans int
requestSize int
bufferSize int
metricsBufferSize int
sampler Sampler
sanitizedFieldNames wildcard.Matchers
disabledMetrics wildcard.Matchers
ignoreTransactionURLs wildcard.Matchers
continuationStrategy string
captureHeaders bool
captureBody CaptureBodyMode
spanStackTraceMinDuration time.Duration
stackTraceLimit int
active bool
recording bool
configWatcher apmconfig.Watcher
breakdownMetrics bool
propagateLegacyHeader bool
profileSender profileSender
versionGetter majorVersionGetter
cpuProfileInterval time.Duration
cpuProfileDuration time.Duration
heapProfileInterval time.Duration
exitSpanMinDuration time.Duration
compressionOptions compressionOptions
globalLabels model.StringMap
}
// initDefaults updates opts with default values.
func (opts *TracerOptions) initDefaults(continueOnError bool) error {
var errs []error
failed := func(err error) bool {
if err == nil {
return false
}
errs = append(errs, err)
return true
}
requestDuration, err := initialRequestDuration()
if failed(err) {
requestDuration = defaultAPIRequestTime
}
metricsInterval, err := initialMetricsInterval()
if err != nil {
metricsInterval = defaultMetricsInterval
errs = append(errs, err)
}
requestSize, err := initialAPIRequestSize()
if err != nil {
requestSize = int(defaultAPIRequestSize)
errs = append(errs, err)
}
bufferSize, err := initialAPIBufferSize()
if err != nil {
bufferSize = int(defaultAPIBufferSize)
errs = append(errs, err)
}
metricsBufferSize, err := initialMetricsBufferSize()
if err != nil {
metricsBufferSize = int(defaultMetricsBufferSize)
errs = append(errs, err)
}
maxSpans, err := initialMaxSpans()
if failed(err) {
maxSpans = defaultMaxSpans
}
spanCompressionEnabled, err := initialSpanCompressionEnabled()
if failed(err) {
spanCompressionEnabled = defaultSpanCompressionEnabled
}
spanCompressionExactMatchMaxDuration, err := initialSpanCompressionExactMatchMaxDuration()
if failed(err) {
spanCompressionExactMatchMaxDuration = defaultSpanCompressionExactMatchMaxDuration
}
spanCompressionSameKindMaxDuration, err := initialSpanCompressionSameKindMaxDuration()
if failed(err) {
spanCompressionSameKindMaxDuration = defaultSpanCompressionSameKindMaxDuration
}
sampler, err := initialSampler()
if failed(err) {
sampler = nil
}
captureHeaders, err := initialCaptureHeaders()
if failed(err) {
captureHeaders = defaultCaptureHeaders
}
captureBody, err := initialCaptureBody()
if failed(err) {
captureBody = CaptureBodyOff
}
spanStackTraceMinDuration, err := initialSpanStackTraceMinDuration()
if failed(err) {
spanStackTraceMinDuration = defaultSpanStackTraceMinDuration
}
stackTraceLimit, err := initialStackTraceLimit()
if failed(err) {
stackTraceLimit = defaultStackTraceLimit
}
active, err := initialActive()
if failed(err) {
active = true
}
recording, err := initialRecording()
if failed(err) {
recording = true
}
centralConfigEnabled, err := initialCentralConfigEnabled()
if failed(err) {
centralConfigEnabled = true
}
breakdownMetricsEnabled, err := initialBreakdownMetricsEnabled()
if failed(err) {
breakdownMetricsEnabled = true
}
propagateLegacyHeader, err := initialUseElasticTraceparentHeader()
if failed(err) {
propagateLegacyHeader = true
}
cpuProfileInterval, cpuProfileDuration, err := initialCPUProfileIntervalDuration()
if failed(err) {
cpuProfileInterval = 0
cpuProfileDuration = 0
}
heapProfileInterval, err := initialHeapProfileInterval()
if failed(err) {
heapProfileInterval = 0
}
exitSpanMinDuration, err := initialExitSpanMinDuration()
if failed(err) {
exitSpanMinDuration = defaultExitSpanMinDuration
}
continuationStrategy, err := initContinuationStrategy()
if failed(err) {
continuationStrategy = defaultContinuationStrategy
}
if opts.ServiceName != "" {
err := validateServiceName(opts.ServiceName)
if failed(err) {
opts.ServiceName = ""
}
}
serviceName, serviceVersion, serviceEnvironment := initialService()
if opts.ServiceName == "" {
opts.ServiceName = serviceName
}
if opts.ServiceVersion == "" {
opts.ServiceVersion = serviceVersion
}
if opts.ServiceEnvironment == "" {
opts.ServiceEnvironment = serviceEnvironment
}
if opts.Transport == nil {
initialTransport, err := initialTransport(opts.ServiceName, opts.ServiceVersion)
if failed(err) {
active = false
} else {
opts.Transport = initialTransport
}
}
if len(errs) != 0 && !continueOnError {
return errs[0]
}
for _, err := range errs {
log.Printf("[apm]: %s", err)
}
opts.globalLabels = parseGlobalLabels()
opts.requestDuration = requestDuration
opts.metricsInterval = metricsInterval
opts.requestSize = requestSize
opts.bufferSize = bufferSize
opts.metricsBufferSize = metricsBufferSize
opts.maxSpans = maxSpans
opts.compressionOptions = compressionOptions{
enabled: spanCompressionEnabled,
exactMatchMaxDuration: spanCompressionExactMatchMaxDuration,
sameKindMaxDuration: spanCompressionSameKindMaxDuration,
}
opts.sampler = sampler
opts.sanitizedFieldNames = initialSanitizedFieldNames()
opts.disabledMetrics = initialDisabledMetrics()
opts.ignoreTransactionURLs = initialIgnoreTransactionURLs()
opts.breakdownMetrics = breakdownMetricsEnabled
opts.captureHeaders = captureHeaders
opts.captureBody = captureBody
opts.spanStackTraceMinDuration = spanStackTraceMinDuration
opts.stackTraceLimit = stackTraceLimit
opts.active = active
opts.recording = recording
opts.propagateLegacyHeader = propagateLegacyHeader
opts.exitSpanMinDuration = exitSpanMinDuration
opts.continuationStrategy = continuationStrategy
if centralConfigEnabled {
if cw, ok := opts.Transport.(apmconfig.Watcher); ok {
opts.configWatcher = cw
}
}
if ps, ok := opts.Transport.(profileSender); ok {
opts.profileSender = ps
opts.cpuProfileInterval = cpuProfileInterval
opts.cpuProfileDuration = cpuProfileDuration
opts.heapProfileInterval = heapProfileInterval
}
if vg, ok := opts.Transport.(majorVersionGetter); ok {
opts.versionGetter = vg
}
return nil
}
type compressionOptions struct {
enabled bool
exactMatchMaxDuration time.Duration
sameKindMaxDuration time.Duration
}
// Tracer manages the sampling and sending of transactions to
// Elastic APM.
//
// Transactions are buffered until they are flushed (forcibly
// with a Flush call, or when the flush timer expires), or when
// the maximum transaction queue size is reached. Failure to
// send will be periodically retried. Once the queue limit has
// been reached, new transactions will replace older ones in
// the queue.
//
// Errors are sent as soon as possible, but will buffered and
// later sent in bulk if the tracer is busy, or otherwise cannot
// send to the server, e.g. due to network failure. There is
// a limit to the number of errors that will be buffered, and
// once that limit has been reached, new errors will be dropped
// until the queue is drained.
type Tracer struct {
transport transport.Transport
service model.Service
process *model.Process
system *model.System
active int32
bufferSize int
metricsBufferSize int
closing chan struct{}
closed chan struct{}
forceFlush chan chan<- struct{}
forceSendMetrics chan chan<- struct{}
configCommands chan tracerConfigCommand
configWatcher chan apmconfig.Watcher
events chan tracerEvent
breakdownMetrics *breakdownMetrics
profileSender profileSender
versionGetter majorVersionGetter
globalLabels model.StringMap
// stats is heap-allocated to ensure correct alignment for atomic access.
stats *TracerStats
// instrumentationConfig_ must only be accessed and mutated
// using Tracer.instrumentationConfig() and Tracer.setInstrumentationConfig().
instrumentationConfigInternal *instrumentationConfig
errorDataPool sync.Pool
spanDataPool sync.Pool
transactionDataPool sync.Pool
}
// NewTracer returns a new Tracer, using the default transport,
// and with the specified service name and version if specified.
// This is equivalent to calling NewTracerOptions with a
// TracerOptions having ServiceName and ServiceVersion set to
// the provided arguments.
func NewTracer(serviceName, serviceVersion string) (*Tracer, error) {
return NewTracerOptions(TracerOptions{
ServiceName: serviceName,
ServiceVersion: serviceVersion,
})
}
// NewTracerOptions returns a new Tracer using the provided options.
// See TracerOptions for details on the options, and their default
// values.
func NewTracerOptions(opts TracerOptions) (*Tracer, error) {
if err := opts.initDefaults(false); err != nil {
return nil, err
}
return newTracer(opts), nil
}
func newTracer(opts TracerOptions) *Tracer {
t := &Tracer{
transport: opts.Transport,
service: makeService(
opts.ServiceName,
opts.ServiceVersion,
opts.ServiceEnvironment,
),
process: ¤tProcess,
system: &localSystem,
closing: make(chan struct{}),
closed: make(chan struct{}),
forceFlush: make(chan chan<- struct{}),
forceSendMetrics: make(chan chan<- struct{}),
configCommands: make(chan tracerConfigCommand),
configWatcher: make(chan apmconfig.Watcher),
events: make(chan tracerEvent, tracerEventChannelCap),
active: 1,
breakdownMetrics: newBreakdownMetrics(),
stats: &TracerStats{},
bufferSize: opts.bufferSize,
metricsBufferSize: opts.metricsBufferSize,
profileSender: opts.profileSender,
versionGetter: opts.versionGetter,
instrumentationConfigInternal: &instrumentationConfig{
local: make(map[string]func(*instrumentationConfigValues)),
},
globalLabels: opts.globalLabels,
}
t.breakdownMetrics.enabled = opts.breakdownMetrics
// Initialise local transaction config.
t.setLocalInstrumentationConfig(envRecording, func(cfg *instrumentationConfigValues) {
cfg.recording = opts.recording
})
t.setLocalInstrumentationConfig(envCaptureBody, func(cfg *instrumentationConfigValues) {
cfg.captureBody = opts.captureBody
})
t.setLocalInstrumentationConfig(envCaptureHeaders, func(cfg *instrumentationConfigValues) {
cfg.captureHeaders = opts.captureHeaders
})
t.setLocalInstrumentationConfig(envMaxSpans, func(cfg *instrumentationConfigValues) {
cfg.maxSpans = opts.maxSpans
})
t.setLocalInstrumentationConfig(envSpanCompressionEnabled, func(cfg *instrumentationConfigValues) {
cfg.compressionOptions.enabled = opts.compressionOptions.enabled
})
t.setLocalInstrumentationConfig(envSpanCompressionExactMatchMaxDuration, func(cfg *instrumentationConfigValues) {
cfg.compressionOptions.exactMatchMaxDuration = opts.compressionOptions.exactMatchMaxDuration
})
t.setLocalInstrumentationConfig(envSpanCompressionSameKindMaxDuration, func(cfg *instrumentationConfigValues) {
cfg.compressionOptions.sameKindMaxDuration = opts.compressionOptions.sameKindMaxDuration
})
t.setLocalInstrumentationConfig(envTransactionSampleRate, func(cfg *instrumentationConfigValues) {
cfg.sampler = opts.sampler
})
t.setLocalInstrumentationConfig(envSpanStackTraceMinDuration, func(cfg *instrumentationConfigValues) {
cfg.spanStackTraceMinDuration = opts.spanStackTraceMinDuration
})
t.setLocalInstrumentationConfig(envStackTraceLimit, func(cfg *instrumentationConfigValues) {
cfg.stackTraceLimit = opts.stackTraceLimit
})
t.setLocalInstrumentationConfig(envUseElasticTraceparentHeader, func(cfg *instrumentationConfigValues) {
cfg.propagateLegacyHeader = opts.propagateLegacyHeader
})
t.setLocalInstrumentationConfig(envSanitizeFieldNames, func(cfg *instrumentationConfigValues) {
cfg.sanitizedFieldNames = opts.sanitizedFieldNames
})
t.setLocalInstrumentationConfig(envIgnoreURLs, func(cfg *instrumentationConfigValues) {
cfg.ignoreTransactionURLs = opts.ignoreTransactionURLs
})
t.setLocalInstrumentationConfig(envExitSpanMinDuration, func(cfg *instrumentationConfigValues) {
cfg.exitSpanMinDuration = opts.exitSpanMinDuration
})
t.setLocalInstrumentationConfig(envContinuationStrategy, func(cfg *instrumentationConfigValues) {
cfg.continuationStrategy = opts.continuationStrategy
})
if logger := apmlog.DefaultLogger(); logger != nil {
defaultLogLevel := logger.Level()
t.setLocalInstrumentationConfig(apmlog.EnvLogLevel, func(cfg *instrumentationConfigValues) {
// Revert to the original, local, log level when
// the centrally defined log level is removed.
logger.SetLevel(defaultLogLevel)
})
}
if !opts.active {
t.active = 0
close(t.closed)
return t
}
go t.loop()
t.configCommands <- func(cfg *tracerConfig) {
cfg.recording = opts.recording
cfg.cpuProfileInterval = opts.cpuProfileInterval
cfg.cpuProfileDuration = opts.cpuProfileDuration
cfg.heapProfileInterval = opts.heapProfileInterval
cfg.metricsInterval = opts.metricsInterval
cfg.requestDuration = opts.requestDuration
cfg.requestSize = opts.requestSize
cfg.disabledMetrics = opts.disabledMetrics
cfg.metricsGatherers = []MetricsGatherer{newBuiltinMetricsGatherer(t)}
if logger := apmlog.DefaultLogger(); logger != nil {
cfg.logger = logger
}
}
if opts.configWatcher != nil {
t.configWatcher <- opts.configWatcher
}
return t
}
// tracerConfig holds the tracer's runtime configuration, which may be modified
// by sending a tracerConfigCommand to the tracer's configCommands channel.
type tracerConfig struct {
recording bool
requestSize int
requestDuration time.Duration
metricsInterval time.Duration
logger Logger
metricsGatherers []MetricsGatherer
disabledMetrics wildcard.Matchers
cpuProfileDuration time.Duration
cpuProfileInterval time.Duration
heapProfileInterval time.Duration
}
type tracerConfigCommand func(*tracerConfig)
// Close closes the Tracer, preventing transactions from being
// sent to the APM server.
func (t *Tracer) Close() {
select {
case <-t.closing:
default:
close(t.closing)
}
<-t.closed
}
// Flush waits for the Tracer to flush any transactions and errors it currently
// has queued to the APM server, the tracer is stopped, or the abort channel
// is signaled.
func (t *Tracer) Flush(abort <-chan struct{}) {
flushed := make(chan struct{}, 1)
select {
case t.forceFlush <- flushed:
select {
case <-abort:
case <-flushed:
case <-t.closed:
}
case <-t.closed:
}
}
// Recording reports whether the tracer is recording events. Instrumentation
// may use this to avoid creating transactions, spans, and metrics when the
// tracer is configured to not record.
//
// Recording will also return false if the tracer is inactive.
func (t *Tracer) Recording() bool {
return t.instrumentationConfig().recording && t.Active()
}
// Active reports whether the tracer is active. If the tracer is inactive,
// no transactions or errors will be sent to the Elastic APM server.
func (t *Tracer) Active() bool {
return atomic.LoadInt32(&t.active) == 1
}
// ShouldPropagateLegacyHeader reports whether instrumentation should
// propagate the legacy "Elastic-Apm-Traceparent" header in addition to
// the standard W3C "traceparent" header.
//
// This method will be removed in a future major version when we remove
// support for propagating the legacy header.
func (t *Tracer) ShouldPropagateLegacyHeader() bool {
return t.instrumentationConfig().propagateLegacyHeader
}
// SetRequestDuration sets the maximum amount of time to keep a request open
// to the APM server for streaming data before closing the stream and starting
// a new request.
func (t *Tracer) SetRequestDuration(d time.Duration) {
t.sendConfigCommand(func(cfg *tracerConfig) {
cfg.requestDuration = d
})
}
// SetMetricsInterval sets the metrics interval -- the amount of time in
// between metrics samples being gathered.
func (t *Tracer) SetMetricsInterval(d time.Duration) {
t.sendConfigCommand(func(cfg *tracerConfig) {
cfg.metricsInterval = d
})
}
// SetLogger sets the Logger to be used for logging the operation of
// the tracer.
//
// The tracer is initialized with a default logger configured with the
// environment variables ELASTIC_APM_LOG_FILE and ELASTIC_APM_LOG_LEVEL.
// Calling SetLogger will replace the default logger.
func (t *Tracer) SetLogger(logger Logger) {
t.sendConfigCommand(func(cfg *tracerConfig) {
cfg.logger = logger
})
}
// SetSanitizedFieldNames sets the wildcard patterns that will be used to
// match cookie and form field names for sanitization. Fields matching any
// of the the supplied patterns will have their values redacted. If
// SetSanitizedFieldNames is called with no arguments, then no fields
// will be redacted.
//
// Configuration via Kibana takes precedence over local configuration, so
// if sanitized_field_names has been configured via Kibana, this call will
// not have any effect until/unless that configuration has been removed.
func (t *Tracer) SetSanitizedFieldNames(patterns ...string) error {
var matchers wildcard.Matchers
if len(patterns) != 0 {
matchers = make(wildcard.Matchers, len(patterns))
for i, p := range patterns {
matchers[i] = configutil.ParseWildcardPattern(p)
}
}
t.setLocalInstrumentationConfig(envSanitizeFieldNames, func(cfg *instrumentationConfigValues) {
cfg.sanitizedFieldNames = matchers
})
return nil
}
// SetIgnoreTransactionURLs sets the wildcard patterns that will be used to
// ignore transactions with matching URLs.
func (t *Tracer) SetIgnoreTransactionURLs(pattern string) error {
t.setLocalInstrumentationConfig(envIgnoreURLs, func(cfg *instrumentationConfigValues) {
cfg.ignoreTransactionURLs = configutil.ParseWildcardPatterns(pattern)
})
return nil
}
// RegisterMetricsGatherer registers g for periodic (or forced) metrics
// gathering by t.
//
// RegisterMetricsGatherer returns a function which will deregister g.
// It may safely be called multiple times.
func (t *Tracer) RegisterMetricsGatherer(g MetricsGatherer) func() {
// Wrap g in a pointer-to-struct, so we can safely compare.
wrapped := &struct{ MetricsGatherer }{MetricsGatherer: g}
t.sendConfigCommand(func(cfg *tracerConfig) {
cfg.metricsGatherers = append(cfg.metricsGatherers, wrapped)
})
deregister := func(cfg *tracerConfig) {
for i, g := range cfg.metricsGatherers {
if g != wrapped {
continue
}
cfg.metricsGatherers = append(cfg.metricsGatherers[:i], cfg.metricsGatherers[i+1:]...)
}
}
var once sync.Once
return func() {
once.Do(func() {
t.sendConfigCommand(deregister)
})
}
}
// SetConfigWatcher sets w as the config watcher.
//
// By default, the tracer will be configured to use the transport for
// watching config, if the transport implements apmconfig.Watcher. This
// can be overridden by calling SetConfigWatcher.
//
// If w is nil, config watching will be stopped.
//
// Calling SetConfigWatcher will discard any previously observed remote
// config, reverting to local config until a config change from w is
// observed.
func (t *Tracer) SetConfigWatcher(w apmconfig.Watcher) {
select {
case t.configWatcher <- w:
case <-t.closing:
case <-t.closed:
}
}
func (t *Tracer) sendConfigCommand(cmd tracerConfigCommand) {
select {
case t.configCommands <- cmd:
case <-t.closing:
case <-t.closed:
}
}
// SetRecording enables or disables recording of future events.
//
// SetRecording does not affect in-flight events.
func (t *Tracer) SetRecording(r bool) {
t.setLocalInstrumentationConfig(envRecording, func(cfg *instrumentationConfigValues) {
// Update instrumentation config to disable transactions and errors.
cfg.recording = r
})
t.sendConfigCommand(func(cfg *tracerConfig) {
// Consult t.instrumentationConfig() as local config may not be in effect,
// or there may have been a concurrent change to instrumentation config.
cfg.recording = t.instrumentationConfig().recording
})
}
// SetSampler sets the sampler the tracer.
//
// It is valid to pass nil, in which case all transactions will be sampled.
//
// Configuration via Kibana takes precedence over local configuration, so
// if sampling has been configured via Kibana, this call will not have any
// effect until/unless that configuration has been removed.
func (t *Tracer) SetSampler(s Sampler) {
t.setLocalInstrumentationConfig(envTransactionSampleRate, func(cfg *instrumentationConfigValues) {
cfg.sampler = s
})
}
// SetMaxSpans sets the maximum number of spans that will be added
// to a transaction before dropping spans.
//
// Passing in zero will disable all spans, while negative values will
// permit an unlimited number of spans.
func (t *Tracer) SetMaxSpans(n int) {
t.setLocalInstrumentationConfig(envMaxSpans, func(cfg *instrumentationConfigValues) {
cfg.maxSpans = n
})
}
// SetSpanCompressionEnabled enables/disables the span compression feature.
func (t *Tracer) SetSpanCompressionEnabled(v bool) {
t.setLocalInstrumentationConfig(envSpanCompressionEnabled, func(cfg *instrumentationConfigValues) {
cfg.compressionOptions.enabled = v
})
}
// SetSpanCompressionExactMatchMaxDuration sets the maximum duration for a span
// to be compressed with `compression_strategy` == `exact_match`.
func (t *Tracer) SetSpanCompressionExactMatchMaxDuration(v time.Duration) {
t.setLocalInstrumentationConfig(envSpanCompressionExactMatchMaxDuration, func(cfg *instrumentationConfigValues) {
cfg.compressionOptions.exactMatchMaxDuration = v
})
}
// SetSpanCompressionSameKindMaxDuration sets the maximum duration for a span
// to be compressed with `compression_strategy` == `same_kind`.
func (t *Tracer) SetSpanCompressionSameKindMaxDuration(v time.Duration) {
t.setLocalInstrumentationConfig(envSpanCompressionSameKindMaxDuration, func(cfg *instrumentationConfigValues) {
cfg.compressionOptions.sameKindMaxDuration = v
})
}
// SetSpanStackTraceMinDuration sets the minimum duration for a span after which
// we will capture its stack frames.
func (t *Tracer) SetSpanStackTraceMinDuration(d time.Duration) {
t.setLocalInstrumentationConfig(envMaxSpans, func(cfg *instrumentationConfigValues) {
cfg.spanStackTraceMinDuration = d
})
}
// SetStackTraceLimit sets the the maximum number of stack frames to collect
// for each stack trace. If limit is negative, then all frames will be collected.
func (t *Tracer) SetStackTraceLimit(limit int) {
t.setLocalInstrumentationConfig(envMaxSpans, func(cfg *instrumentationConfigValues) {
cfg.stackTraceLimit = limit
})
}
// SetCaptureHeaders enables or disables capturing of HTTP headers.
func (t *Tracer) SetCaptureHeaders(capture bool) {
t.setLocalInstrumentationConfig(envMaxSpans, func(cfg *instrumentationConfigValues) {
cfg.captureHeaders = capture
})
}
// SetCaptureBody sets the HTTP request body capture mode.
func (t *Tracer) SetCaptureBody(mode CaptureBodyMode) {
t.setLocalInstrumentationConfig(envMaxSpans, func(cfg *instrumentationConfigValues) {
cfg.captureBody = mode
})
}
// SetExitSpanMinDuration sets the minimum duration for an exit span to not be
// dropped.
func (t *Tracer) SetExitSpanMinDuration(v time.Duration) {
t.setLocalInstrumentationConfig(envExitSpanMinDuration, func(cfg *instrumentationConfigValues) {
cfg.exitSpanMinDuration = v
})
}
// SetContinuationStrategy sets the continuation strategy.
func (t *Tracer) SetContinuationStrategy(v string) {
t.setLocalInstrumentationConfig(envContinuationStrategy, func(cfg *instrumentationConfigValues) {
cfg.continuationStrategy = v
})
}
// SendMetrics forces the tracer to gather and send metrics immediately,
// blocking until the metrics have been sent or the abort channel is
// signalled.
func (t *Tracer) SendMetrics(abort <-chan struct{}) {
sent := make(chan struct{}, 1)
select {
case t.forceSendMetrics <- sent:
select {
case <-abort:
case <-sent:
case <-t.closed:
}
case <-t.closed:
}
}
// Stats returns the current TracerStats. This will return the most
// recent values even after the tracer has been closed.
func (t *Tracer) Stats() TracerStats {
return t.stats.copy()
}
func (t *Tracer) loop() {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()
defer close(t.closed)
defer atomic.StoreInt32(&t.active, 0)
var req iochan.ReadRequest
var requestBuf bytes.Buffer
var metadata []byte
var gracePeriod time.Duration = -1
var flushed chan<- struct{}
var requestBufTransactions, requestBufSpans, requestBufErrors, requestBufMetricsets uint64
zlibWriter, _ := zlib.NewWriterLevel(&requestBuf, zlib.BestSpeed)
zlibFlushed := true
zlibClosed := false
iochanReader := iochan.NewReader()
requestBytesRead := 0
requestActive := false
closeRequest := false
flushRequest := false
requestResult := make(chan error, 1)
requestTimer := time.NewTimer(0)
requestTimerActive := false
if !requestTimer.Stop() {
<-requestTimer.C
}
// Run another goroutine to perform the blocking requests,
// communicating with the tracer loop to obtain stream data.
sendStreamRequest := make(chan time.Duration)
done := make(chan struct{})
defer func() {
close(sendStreamRequest)
<-done
}()
go func() {
defer close(done)
jitterRand := rand.New(rand.NewSource(time.Now().UnixNano()))
for gracePeriod := range sendStreamRequest {
if gracePeriod > 0 {
select {
case <-time.After(jitterDuration(gracePeriod, jitterRand, gracePeriodJitter)):
case <-ctx.Done():
}
}
requestResult <- t.transport.SendStream(ctx, iochanReader)
}
}()
refreshServerVersionDeadline := 10 * time.Second
refreshVersionTicker := time.NewTicker(refreshServerVersionDeadline)
defer refreshVersionTicker.Stop()
if t.versionGetter != nil {
go t.maybeRefreshServerVersion(ctx, refreshServerVersionDeadline)
} else {
// If versionGetter is nil, stop the timer.
refreshVersionTicker.Stop()
}
var breakdownMetricsLimitWarningLogged bool
var stats TracerStats
var metrics Metrics
var sentMetrics chan<- struct{}
var gatheringMetrics bool
var metricsTimerStart time.Time
metricsBuffer := ringbuffer.New(t.metricsBufferSize)
gatheredMetrics := make(chan struct{}, 1)
metricsTimer := time.NewTimer(0)
if !metricsTimer.Stop() {
<-metricsTimer.C
}
var lastConfigChange map[string]string
var configChanges <-chan apmconfig.Change
var stopConfigWatcher func()
defer func() {
if stopConfigWatcher != nil {
stopConfigWatcher()
}
}()
cpuProfilingState := newCPUProfilingState(t.profileSender)
heapProfilingState := newHeapProfilingState(t.profileSender)
var cfg tracerConfig
buffer := ringbuffer.New(t.bufferSize)
buffer.Evicted = func(h ringbuffer.BlockHeader) {
switch h.Tag {
case errorBlockTag:
stats.ErrorsDropped++
case spanBlockTag:
stats.SpansDropped++
case transactionBlockTag:
stats.TransactionsDropped++
}
}
modelWriter := modelWriter{
buffer: buffer,
metricsBuffer: metricsBuffer,
cfg: &cfg,
stats: &stats,
}
handleTracerConfigCommand := func(cmd tracerConfigCommand) {
var oldMetricsInterval time.Duration
if cfg.recording {
oldMetricsInterval = cfg.metricsInterval
}
cmd(&cfg)
var metricsInterval, cpuProfileInterval, cpuProfileDuration, heapProfileInterval time.Duration
if cfg.recording {
metricsInterval = cfg.metricsInterval
cpuProfileInterval = cfg.cpuProfileInterval
cpuProfileDuration = cfg.cpuProfileDuration
heapProfileInterval = cfg.heapProfileInterval
}
cpuProfilingState.updateConfig(cpuProfileInterval, cpuProfileDuration)
heapProfilingState.updateConfig(heapProfileInterval, 0)
if !gatheringMetrics && metricsInterval != oldMetricsInterval {
if metricsTimerStart.IsZero() {
if metricsInterval > 0 {
metricsTimer.Reset(metricsInterval)
metricsTimerStart = time.Now()
}
} else {
if metricsInterval <= 0 {
metricsTimerStart = time.Time{}
if !metricsTimer.Stop() {
<-metricsTimer.C
}
} else {
alreadyPassed := time.Since(metricsTimerStart)
if alreadyPassed >= metricsInterval {
metricsTimer.Reset(0)
} else {
metricsTimer.Reset(metricsInterval - alreadyPassed)
}
}
}
}
}
for {
var gatherMetrics bool
select {
case <-t.closing:
cancelContext() // informs transport that EOF is expected
iochanReader.CloseRead(io.EOF)
return
case cmd := <-t.configCommands:
handleTracerConfigCommand(cmd)
continue
case cw := <-t.configWatcher:
if configChanges != nil {
stopConfigWatcher()
t.updateRemoteConfig(cfg.logger, lastConfigChange, nil)
lastConfigChange = nil
configChanges = nil
}
if cw == nil {
continue
}
var configWatcherContext context.Context
var watchParams apmconfig.WatchParams
watchParams.Service.Name = t.service.Name
watchParams.Service.Environment = t.service.Environment
configWatcherContext, stopConfigWatcher = context.WithCancel(ctx)
configChanges = cw.WatchConfig(configWatcherContext, watchParams)
// Silence go vet's "possible context leak" false positive.
// We call a previous stopConfigWatcher before reassigning
// the variable, and we have a defer at the top level of the
// loop method that will call the final stopConfigWatcher
// value on method exit.
_ = stopConfigWatcher
continue
case change, ok := <-configChanges:
if !ok {
configChanges = nil
continue
}
if change.Err != nil {
if cfg.logger != nil {
cfg.logger.Errorf("config request failed: %s", change.Err)
}
} else {
t.updateRemoteConfig(cfg.logger, lastConfigChange, change.Attrs)
lastConfigChange = change.Attrs
handleTracerConfigCommand(func(cfg *tracerConfig) {
cfg.recording = t.instrumentationConfig().recording
})
}
continue
case <-refreshVersionTicker.C:
go t.maybeRefreshServerVersion(ctx, refreshServerVersionDeadline)
case event := <-t.events:
switch event.eventType {
case transactionEvent:
if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) {
if !breakdownMetricsLimitWarningLogged && cfg.logger != nil {
cfg.logger.Warningf("%s", breakdownMetricsLimitWarning)
breakdownMetricsLimitWarningLogged = true
}
}
// Drop unsampled transactions when the APM Server is >= 8.0
drop := t.maybeDropTransaction(
ctx, event.tx.TransactionData, event.tx.Sampled(),
)
if !drop {
modelWriter.writeTransaction(event.tx.Transaction, event.tx.TransactionData)
}
case spanEvent:
modelWriter.writeSpan(event.span.Span, event.span.SpanData)
case errorEvent:
modelWriter.writeError(event.err)
// Flush the buffer to transmit the error immediately.
flushRequest = true
}
case <-requestTimer.C:
requestTimerActive = false
closeRequest = true
case <-metricsTimer.C:
metricsTimerStart = time.Time{}
gatherMetrics = !gatheringMetrics
case sentMetrics = <-t.forceSendMetrics:
if cfg.recording {
if !metricsTimerStart.IsZero() {
if !metricsTimer.Stop() {
<-metricsTimer.C
}
metricsTimerStart = time.Time{}
}
gatherMetrics = !gatheringMetrics
}
case <-gatheredMetrics:
modelWriter.writeMetrics(&metrics)
gatheringMetrics = false
flushRequest = true
if cfg.recording && cfg.metricsInterval > 0 {
metricsTimerStart = time.Now()
metricsTimer.Reset(cfg.metricsInterval)
}
case <-cpuProfilingState.timer.C:
cpuProfilingState.start(ctx, cfg.logger, t.metadataReader())
case <-cpuProfilingState.finished:
cpuProfilingState.resetTimer()
case <-heapProfilingState.timer.C:
heapProfilingState.start(ctx, cfg.logger, t.metadataReader())
case <-heapProfilingState.finished:
heapProfilingState.resetTimer()
case flushed = <-t.forceFlush:
// Drain any objects buffered in the channels.
for n := len(t.events); n > 0; n-- {
event := <-t.events
switch event.eventType {
case transactionEvent:
if !t.breakdownMetrics.recordTransaction(event.tx.TransactionData) {
if !breakdownMetricsLimitWarningLogged && cfg.logger != nil {
cfg.logger.Warningf("%s", breakdownMetricsLimitWarning)
breakdownMetricsLimitWarningLogged = true
}
}
// Drop unsampled transactions when the APM Server is >= 8.0
drop := t.maybeDropTransaction(
ctx, event.tx.TransactionData, event.tx.Sampled(),
)
if !drop {
modelWriter.writeTransaction(event.tx.Transaction, event.tx.TransactionData)
}
case spanEvent:
modelWriter.writeSpan(event.span.Span, event.span.SpanData)
case errorEvent:
modelWriter.writeError(event.err)
}
}
if !requestActive && buffer.Len() == 0 && metricsBuffer.Len() == 0 {
flushed <- struct{}{}
continue
}
closeRequest = true
case req = <-iochanReader.C:
case err := <-requestResult:
if err != nil {
stats.Errors.SendStream++
gracePeriod = nextGracePeriod(gracePeriod)
if cfg.logger != nil {
logf := cfg.logger.Debugf
if err, ok := err.(*transport.HTTPError); ok && err.Response.StatusCode == 404 {
// 404 typically means the server is too old, meaning
// the error is due to a misconfigured environment.
logf = cfg.logger.Errorf
}
logf("request failed: %s (next request in ~%s)", err, gracePeriod)
}
} else {
gracePeriod = -1 // Reset grace period after success.
stats.TransactionsSent += requestBufTransactions
stats.SpansSent += requestBufSpans
stats.ErrorsSent += requestBufErrors
if cfg.logger != nil {
s := func(n uint64) string {
if n != 1 {
return "s"
}
return ""
}
cfg.logger.Debugf(
"sent request with %d transaction%s, %d span%s, %d error%s, %d metricset%s",
requestBufTransactions, s(requestBufTransactions),
requestBufSpans, s(requestBufSpans),
requestBufErrors, s(requestBufErrors),
requestBufMetricsets, s(requestBufMetricsets),
)
}
}
if !stats.isZero() {
t.stats.accumulate(stats)
stats = TracerStats{}
}
if sentMetrics != nil && requestBufMetricsets > 0 {
sentMetrics <- struct{}{}
sentMetrics = nil
}
if flushed != nil {
flushed <- struct{}{}
flushed = nil
}
if req.Buf != nil {
// req will be canceled by CloseRead below.
req.Buf = nil
}
iochanReader.CloseRead(io.EOF)
iochanReader = iochan.NewReader()
flushRequest = false
closeRequest = false
requestActive = false
requestBytesRead = 0
requestBuf.Reset()
requestBufTransactions = 0
requestBufSpans = 0
requestBufErrors = 0
requestBufMetricsets = 0
if requestTimerActive {
if !requestTimer.Stop() {
<-requestTimer.C
}
requestTimerActive = false
}
}
if !stats.isZero() {
t.stats.accumulate(stats)
stats = TracerStats{}
}
if gatherMetrics {
gatheringMetrics = true
metrics.disabled = cfg.disabledMetrics
t.gatherMetrics(ctx, cfg.metricsGatherers, &metrics, cfg.logger, gatheredMetrics)
if cfg.logger != nil {
cfg.logger.Debugf("gathering metrics")
}
}
if !requestActive {
if buffer.Len() == 0 && metricsBuffer.Len() == 0 {
continue
}
sendStreamRequest <- gracePeriod
if metadata == nil {
metadata = t.jsonRequestMetadata()
}
zlibWriter.Reset(&requestBuf)
zlibWriter.Write(metadata)
zlibFlushed = false
zlibClosed = false
requestActive = true
requestTimer.Reset(cfg.requestDuration)
requestTimerActive = true
}
if !closeRequest || !zlibClosed {
for requestBytesRead+requestBuf.Len() < cfg.requestSize {
if metricsBuffer.Len() > 0 {
if _, _, err := metricsBuffer.WriteBlockTo(zlibWriter); err == nil {
requestBufMetricsets++
zlibWriter.Write([]byte("\n"))
zlibFlushed = false
if sentMetrics != nil {
// SendMetrics was called: close the request
// off so we can inform the user when the
// metrics have been processed.
closeRequest = true
}
}
continue
}
if buffer.Len() == 0 {
break
}
if h, _, err := buffer.WriteBlockTo(zlibWriter); err == nil {
switch h.Tag {
case transactionBlockTag:
requestBufTransactions++
case spanBlockTag:
requestBufSpans++
case errorBlockTag:
requestBufErrors++
}
zlibWriter.Write([]byte("\n"))
zlibFlushed = false
}
}
if !closeRequest {
closeRequest = requestBytesRead+requestBuf.Len() >= cfg.requestSize
}
}
if closeRequest {
if !zlibClosed {
zlibWriter.Close()
zlibClosed = true
}
} else if flushRequest && !zlibFlushed {
zlibWriter.Flush()
flushRequest = false
zlibFlushed = true
}
if req.Buf == nil || requestBuf.Len() == 0 {
continue
}
const zlibHeaderLen = 2
if requestBytesRead+requestBuf.Len() > zlibHeaderLen {
n, err := requestBuf.Read(req.Buf)
if closeRequest && err == nil && requestBuf.Len() == 0 {
err = io.EOF
}
req.Respond(n, err)
req.Buf = nil
if n > 0 {
requestBytesRead += n
}
}
}
}
// jsonRequestMetadata returns a JSON-encoded metadata object that features
// at the head of every request body. This is called exactly once, when the
// first request is made.
func (t *Tracer) jsonRequestMetadata() []byte {
var json fastjson.Writer
json.RawString(`{"metadata":`)
t.encodeRequestMetadata(&json)
json.RawString("}\n")
return json.Bytes()
}
// metadataReader returns an io.Reader that holds the JSON-encoded metadata,
// suitable for including in a profile request.
func (t *Tracer) metadataReader() io.Reader {
var metadata fastjson.Writer
t.encodeRequestMetadata(&metadata)
return bytes.NewReader(metadata.Bytes())
}
func (t *Tracer) encodeRequestMetadata(json *fastjson.Writer) {
json.RawString(`{"system":`)
t.system.MarshalFastJSON(json)
json.RawString(`,"process":`)
t.process.MarshalFastJSON(json)
json.RawString(`,"service":`)
t.service.MarshalFastJSON(json)
if cloud := getCloudMetadata(); cloud != nil {
json.RawString(`,"cloud":`)
cloud.MarshalFastJSON(json)
}
if len(t.globalLabels) > 0 {
json.RawString(`,"labels":`)
t.globalLabels.MarshalFastJSON(json)
}
json.RawByte('}')
}
// gatherMetrics gathers metrics from each of the registered
// metrics gatherers. Once all gatherers have returned, a value
// will be sent on the "gathered" channel.
func (t *Tracer) gatherMetrics(ctx context.Context, gatherers []MetricsGatherer, m *Metrics, l Logger, gathered chan<- struct{}) {
timestamp := model.Time(time.Now().UTC())
var group sync.WaitGroup
for _, g := range gatherers {
group.Add(1)
go func(g MetricsGatherer) {
defer group.Done()
gatherMetrics(ctx, g, m, l)
}(g)
}
go func() {
group.Wait()
for _, m := range m.transactionGroupMetrics {
m.Timestamp = timestamp
}
for _, m := range m.metrics {
m.Timestamp = timestamp
}
gathered <- struct{}{}
}()
}
// maybeDropTransaction may drop a transaction, for example when the transaction
// is non-sampled and the target server version is 8.0 or greater.
// maybeDropTransaction returns true if the transaction is dropped, false otherwise.
func (t *Tracer) maybeDropTransaction(ctx context.Context, td *TransactionData, sampled bool) bool {
if sampled || t.versionGetter == nil {
return false
}
v := t.versionGetter.MajorServerVersion(ctx, false)
dropUnsampled := v >= 8
if dropUnsampled {
td.reset(t)
}
return dropUnsampled
}
// maybeRefreshServerVersion refreshes the remote APM Server version if the version
// has been marked as stale.
func (t *Tracer) maybeRefreshServerVersion(ctx context.Context, deadline time.Duration) {
if t.versionGetter == nil {
return
}
// Fast path, when the version has been cached, there's nothing to do.
if v := t.versionGetter.MajorServerVersion(ctx, false); v > 0 {
return
}
// If there isn't a cached version, try to refresh the version.
if deadline > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, deadline)
defer cancel()
}
_ = t.versionGetter.MajorServerVersion(ctx, true)
return
}
type tracerEventType int
const (
transactionEvent tracerEventType = iota
spanEvent
errorEvent
)
type tracerEvent struct {
eventType tracerEventType
// err is set only if eventType == errorEvent.
err *ErrorData
// tx is set only if eventType == transactionEvent.
tx struct {
*Transaction
// Transaction.TransactionData is nil at the
// point tracerEvent is created (to signify
// that the transaction is ended), so we pass
// it along side.
*TransactionData
}
// span is set only if eventType == spanEvent.
span struct {
*Span
// Span.SpanData is nil at the point tracerEvent
// is created (to signify that the span is ended),
// so we pass it along side.
*SpanData
}
}
type majorVersionGetter interface {
// MajorServerVersion returns the APM Server's major version. When refreshStale
// is true` it will request the remote APM Server's version from `/`, otherwise
// it will return the cached version. If the returned first argument is 0, the
// cache is stale.
MajorServerVersion(ctx context.Context, refreshStale bool) uint32
}
func parseGlobalLabels() model.StringMap {
var labels model.StringMap
for _, kv := range configutil.ParseListEnv(envGlobalLabels, ",", nil) {
i := strings.IndexRune(kv, '=')
if i > 0 {
k, v := strings.TrimSpace(kv[:i]), strings.TrimSpace(kv[i+1:])
labels = append(labels, model.StringMapItem{
Key: cleanLabelKey(k),
Value: truncateString(v),
})
}
}
return labels
}