config.go (567 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apm // import "go.elastic.co/apm/v2"
import (
"fmt"
"net/url"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync/atomic"
"time"
"unsafe"
"github.com/pkg/errors"
"go.elastic.co/apm/v2/internal/apmlog"
"go.elastic.co/apm/v2/internal/configutil"
"go.elastic.co/apm/v2/internal/wildcard"
"go.elastic.co/apm/v2/transport"
)
const (
envMetricsInterval = "ELASTIC_APM_METRICS_INTERVAL"
envMaxSpans = "ELASTIC_APM_TRANSACTION_MAX_SPANS"
envTransactionSampleRate = "ELASTIC_APM_TRANSACTION_SAMPLE_RATE"
envSanitizeFieldNames = "ELASTIC_APM_SANITIZE_FIELD_NAMES"
envCaptureHeaders = "ELASTIC_APM_CAPTURE_HEADERS"
envCaptureBody = "ELASTIC_APM_CAPTURE_BODY"
envServiceName = "ELASTIC_APM_SERVICE_NAME"
envServiceVersion = "ELASTIC_APM_SERVICE_VERSION"
envEnvironment = "ELASTIC_APM_ENVIRONMENT"
envSpanStackTraceMinDuration = "ELASTIC_APM_SPAN_STACK_TRACE_MIN_DURATION"
deprecatedEnvSpanFramesMinDuration = "ELASTIC_APM_SPAN_FRAMES_MIN_DURATION"
envActive = "ELASTIC_APM_ACTIVE"
envRecording = "ELASTIC_APM_RECORDING"
envAPIRequestSize = "ELASTIC_APM_API_REQUEST_SIZE"
envAPIRequestTime = "ELASTIC_APM_API_REQUEST_TIME"
envAPIBufferSize = "ELASTIC_APM_API_BUFFER_SIZE"
envMetricsBufferSize = "ELASTIC_APM_METRICS_BUFFER_SIZE"
envDisableMetrics = "ELASTIC_APM_DISABLE_METRICS"
envIgnoreURLs = "ELASTIC_APM_TRANSACTION_IGNORE_URLS"
deprecatedEnvIgnoreURLs = "ELASTIC_APM_IGNORE_URLS"
envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS"
envStackTraceLimit = "ELASTIC_APM_STACK_TRACE_LIMIT"
envCentralConfig = "ELASTIC_APM_CENTRAL_CONFIG"
envBreakdownMetrics = "ELASTIC_APM_BREAKDOWN_METRICS"
envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER"
envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER"
envContinuationStrategy = "ELASTIC_APM_TRACE_CONTINUATION_STRATEGY"
// span_compression (default `true`)
envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED"
// span_compression_exact_match_max_duration (default `50ms`)
envSpanCompressionExactMatchMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION"
// span_compression_same_kind_max_duration (default `0ms`)
envSpanCompressionSameKindMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION"
// exit_span_min_duration (default `1ms`)
envExitSpanMinDuration = "ELASTIC_APM_EXIT_SPAN_MIN_DURATION"
// NOTE(axw) profiling environment variables are experimental.
// They may be removed in a future minor version without being
// considered a breaking change.
envCPUProfileInterval = "ELASTIC_APM_CPU_PROFILE_INTERVAL"
envCPUProfileDuration = "ELASTIC_APM_CPU_PROFILE_DURATION"
envHeapProfileInterval = "ELASTIC_APM_HEAP_PROFILE_INTERVAL"
defaultAPIRequestSize = 750 * configutil.KByte
defaultAPIRequestTime = 10 * time.Second
defaultAPIBufferSize = 1 * configutil.MByte
defaultMetricsBufferSize = 750 * configutil.KByte
defaultMetricsInterval = 30 * time.Second
defaultMaxSpans = 500
defaultCaptureHeaders = true
defaultCaptureBody = CaptureBodyOff
defaultSpanStackTraceMinDuration = 5 * time.Millisecond
defaultStackTraceLimit = 50
defaultContinuationStrategy = "continue"
defaultExitSpanMinDuration = time.Millisecond
minAPIBufferSize = 10 * configutil.KByte
maxAPIBufferSize = 100 * configutil.MByte
minAPIRequestSize = 1 * configutil.KByte
maxAPIRequestSize = 5 * configutil.MByte
minMetricsBufferSize = 10 * configutil.KByte
maxMetricsBufferSize = 100 * configutil.MByte
// Span Compressions default setting values
defaultSpanCompressionEnabled = true
defaultSpanCompressionExactMatchMaxDuration = 50 * time.Millisecond
defaultSpanCompressionSameKindMaxDuration = 0
)
var (
defaultSanitizedFieldNames = configutil.ParseWildcardPatterns(strings.Join([]string{
"password",
"passwd",
"pwd",
"secret",
"*key",
"*token*",
"*session*",
"*credit*",
"*card*",
"*auth*",
"set-cookie",
"*principal*",
}, ","))
)
// Regular expression matching comment characters to escape in the User-Agent header value.
//
// See https://httpwg.org/specs/rfc7230.html#field.components
var httpComment = regexp.MustCompile("[^\\t \\x21-\\x27\\x2a-\\x5b\\x5d-\\x7e\\x80-\\xff]")
func initialTransport(serviceName, serviceVersion string) (transport.Transport, error) {
// User-Agent should be "apm-agent-go/<agent-version> (service-name service-version)".
service := serviceName
if serviceVersion != "" {
service += " " + httpComment.ReplaceAllString(serviceVersion, "_")
}
userAgent := fmt.Sprintf("%s (%s)", transport.DefaultUserAgent(), service)
httpTransport, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{
UserAgent: userAgent,
})
if err != nil {
return nil, err
}
return httpTransport, nil
}
func initialRequestDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(envAPIRequestTime, defaultAPIRequestTime)
}
func initialMetricsInterval() (time.Duration, error) {
return configutil.ParseDurationEnv(envMetricsInterval, defaultMetricsInterval)
}
func initialMetricsBufferSize() (int, error) {
size, err := configutil.ParseSizeEnv(envMetricsBufferSize, defaultMetricsBufferSize)
if err != nil {
return 0, err
}
if size < minMetricsBufferSize || size > maxMetricsBufferSize {
return 0, errors.Errorf(
"%s must be at least %s and less than %s, got %s",
envMetricsBufferSize, minMetricsBufferSize, maxMetricsBufferSize, size,
)
}
return int(size), nil
}
func initialAPIBufferSize() (int, error) {
size, err := configutil.ParseSizeEnv(envAPIBufferSize, defaultAPIBufferSize)
if err != nil {
return 0, err
}
if size < minAPIBufferSize || size > maxAPIBufferSize {
return 0, errors.Errorf(
"%s must be at least %s and less than %s, got %s",
envAPIBufferSize, minAPIBufferSize, maxAPIBufferSize, size,
)
}
return int(size), nil
}
func initialAPIRequestSize() (int, error) {
size, err := configutil.ParseSizeEnv(envAPIRequestSize, defaultAPIRequestSize)
if err != nil {
return 0, err
}
if size < minAPIRequestSize || size > maxAPIRequestSize {
return 0, errors.Errorf(
"%s must be at least %s and less than %s, got %s",
envAPIRequestSize, minAPIRequestSize, maxAPIRequestSize, size,
)
}
return int(size), nil
}
func initialMaxSpans() (int, error) {
value := os.Getenv(envMaxSpans)
if value == "" {
return defaultMaxSpans, nil
}
max, err := strconv.Atoi(value)
if err != nil {
return 0, errors.Wrapf(err, "failed to parse %s", envMaxSpans)
}
return max, nil
}
// initialSampler returns a nil Sampler if all transactions should be sampled.
func initialSampler() (Sampler, error) {
value := os.Getenv(envTransactionSampleRate)
return parseSampleRate(envTransactionSampleRate, value)
}
// parseSampleRate parses a numeric sampling rate in the range [0,1.0], returning a Sampler.
func parseSampleRate(name, value string) (Sampler, error) {
if value == "" {
value = "1"
}
ratio, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse %s", name)
}
if ratio < 0.0 || ratio > 1.0 {
return nil, errors.Errorf(
"invalid value for %s: %s (out of range [0,1.0])",
name, value,
)
}
return NewRatioSampler(ratio), nil
}
func initialSanitizedFieldNames() wildcard.Matchers {
return configutil.ParseWildcardPatternsEnv(envSanitizeFieldNames, defaultSanitizedFieldNames)
}
func initContinuationStrategy() (string, error) {
value := os.Getenv(envContinuationStrategy)
if value == "" {
return defaultContinuationStrategy, nil
}
return value, validateContinuationStrategy(value)
}
func validateContinuationStrategy(value string) error {
switch value {
case "continue", "restart", "restart_external":
return nil
default:
return fmt.Errorf("unknown continuation strategy: %s", value)
}
}
func initialCaptureHeaders() (bool, error) {
return configutil.ParseBoolEnv(envCaptureHeaders, defaultCaptureHeaders)
}
func initialCaptureBody() (CaptureBodyMode, error) {
value := os.Getenv(envCaptureBody)
if value == "" {
return defaultCaptureBody, nil
}
return parseCaptureBody(envCaptureBody, value)
}
func parseCaptureBody(name, value string) (CaptureBodyMode, error) {
switch strings.TrimSpace(strings.ToLower(value)) {
case "all":
return CaptureBodyAll, nil
case "errors":
return CaptureBodyErrors, nil
case "transactions":
return CaptureBodyTransactions, nil
case "off":
return CaptureBodyOff, nil
}
return -1, errors.Errorf("invalid %s value %q", name, value)
}
func initialService() (name, version, environment string) {
name = os.Getenv(envServiceName)
version = os.Getenv(envServiceVersion)
environment = os.Getenv(envEnvironment)
if name == "" {
name = filepath.Base(os.Args[0])
if runtime.GOOS == "windows" {
name = strings.TrimSuffix(name, filepath.Ext(name))
}
}
name = sanitizeServiceName(name)
return name, version, environment
}
func initialSpanStackTraceMinDuration() (time.Duration, error) {
if v, err := configutil.ParseDurationEnv(envSpanStackTraceMinDuration, defaultSpanStackTraceMinDuration); err != nil || v != defaultSpanStackTraceMinDuration {
// if envSpanStackTraceMinDuration was provided ignore the deprecated option
return v, err
}
v, err := configutil.ParseDurationEnv(deprecatedEnvSpanFramesMinDuration, defaultSpanStackTraceMinDuration)
if err != nil {
return v, err
}
// The meaning of the value was changed.
// convert the old value in span_stack_trace_min_duration
if v == 0 {
return -1, nil
}
if v == -1 {
return 0, nil
}
return v, nil
}
func initialActive() (bool, error) {
return configutil.ParseBoolEnv(envActive, true)
}
func initialRecording() (bool, error) {
return configutil.ParseBoolEnv(envRecording, true)
}
func initialDisabledMetrics() wildcard.Matchers {
return configutil.ParseWildcardPatternsEnv(envDisableMetrics, nil)
}
func initialIgnoreTransactionURLs() wildcard.Matchers {
matchers := configutil.ParseWildcardPatternsEnv(envIgnoreURLs, nil)
if len(matchers) == 0 {
matchers = configutil.ParseWildcardPatternsEnv(deprecatedEnvIgnoreURLs, nil)
}
return matchers
}
func initialStackTraceLimit() (int, error) {
value := os.Getenv(envStackTraceLimit)
if value == "" {
return defaultStackTraceLimit, nil
}
limit, err := strconv.Atoi(value)
if err != nil {
return 0, errors.Wrapf(err, "failed to parse %s", envStackTraceLimit)
}
return limit, nil
}
func initialCentralConfigEnabled() (bool, error) {
return configutil.ParseBoolEnv(envCentralConfig, true)
}
func initialBreakdownMetricsEnabled() (bool, error) {
return configutil.ParseBoolEnv(envBreakdownMetrics, true)
}
func initialUseElasticTraceparentHeader() (bool, error) {
return configutil.ParseBoolEnv(envUseElasticTraceparentHeader, true)
}
func initialSpanCompressionEnabled() (bool, error) {
return configutil.ParseBoolEnv(envSpanCompressionEnabled,
defaultSpanCompressionEnabled,
)
}
func initialSpanCompressionExactMatchMaxDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(
envSpanCompressionExactMatchMaxDuration,
defaultSpanCompressionExactMatchMaxDuration,
)
}
func initialSpanCompressionSameKindMaxDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(
envSpanCompressionSameKindMaxDuration,
defaultSpanCompressionSameKindMaxDuration,
)
}
func initialCPUProfileIntervalDuration() (time.Duration, time.Duration, error) {
interval, err := configutil.ParseDurationEnv(envCPUProfileInterval, 0)
if err != nil || interval <= 0 {
return 0, 0, err
}
duration, err := configutil.ParseDurationEnv(envCPUProfileDuration, 0)
if err != nil || duration <= 0 {
return 0, 0, err
}
return interval, duration, nil
}
func initialHeapProfileInterval() (time.Duration, error) {
return configutil.ParseDurationEnv(envHeapProfileInterval, 0)
}
func initialExitSpanMinDuration() (time.Duration, error) {
return configutil.ParseDurationEnvOptions(
envExitSpanMinDuration, defaultExitSpanMinDuration,
configutil.DurationOptions{MinimumDurationUnit: time.Microsecond},
)
}
// updateRemoteConfig updates t and cfg with changes held in "attrs", and reverts to local
// config for config attributes that have been removed (exist in old but not in attrs).
//
// On return from updateRemoteConfig, unapplied config will have been removed from attrs.
func (t *Tracer) updateRemoteConfig(logger Logger, old, attrs map[string]string) {
warningf := func(string, ...interface{}) {}
debugf := func(string, ...interface{}) {}
errorf := func(string, ...interface{}) {}
if logger != nil {
warningf = logger.Warningf
debugf = logger.Debugf
errorf = logger.Errorf
}
envName := func(k string) string {
return "ELASTIC_APM_" + strings.ToUpper(k)
}
var updates []func(cfg *instrumentationConfig)
for k, v := range attrs {
if oldv, ok := old[k]; ok && oldv == v {
continue
}
switch envName(k) {
case envCaptureBody:
value, err := parseCaptureBody(k, v)
if err != nil {
errorf("central config failure: %s", err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.captureBody = value
})
}
case envMaxSpans:
value, err := strconv.Atoi(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.maxSpans = value
})
}
case envExitSpanMinDuration:
duration, err := configutil.ParseDurationOptions(v, configutil.DurationOptions{
MinimumDurationUnit: time.Microsecond,
})
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.exitSpanMinDuration = duration
})
case envIgnoreURLs:
matchers := configutil.ParseWildcardPatterns(v)
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.ignoreTransactionURLs = matchers
})
case envRecording:
recording, err := strconv.ParseBool(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.recording = recording
})
}
case envSanitizeFieldNames:
matchers := configutil.ParseWildcardPatterns(v)
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.sanitizedFieldNames = matchers
})
case envContinuationStrategy:
if err := validateContinuationStrategy(v); err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.continuationStrategy = v
})
case envSpanStackTraceMinDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.spanStackTraceMinDuration = duration
})
}
case envStackTraceLimit:
limit, err := strconv.Atoi(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.stackTraceLimit = limit
})
}
case envTransactionSampleRate:
sampler, err := parseSampleRate(k, v)
if err != nil {
errorf("central config failure: %s", err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.sampler = sampler
})
}
case apmlog.EnvLogLevel:
level, err := apmlog.ParseLogLevel(v)
if err != nil {
errorf("central config failure: %s", err)
delete(attrs, k)
continue
}
if dl := apmlog.DefaultLogger(); dl != nil && dl == logger {
updates = append(updates, func(*instrumentationConfig) {
dl.SetLevel(level)
})
} else {
warningf("central config ignored: %s set to %s, but custom logger in use", k, v)
delete(attrs, k)
continue
}
case envSpanCompressionEnabled:
val, err := strconv.ParseBool(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.enabled = val
})
case envSpanCompressionExactMatchMaxDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.exactMatchMaxDuration = duration
})
case envSpanCompressionSameKindMaxDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.sameKindMaxDuration = duration
})
default:
warningf("central config failure: unsupported config: %s", k)
delete(attrs, k)
continue
}
debugf("central config update: updated %s to %s", k, v)
}
for k := range old {
if _, ok := attrs[k]; ok {
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
if f, ok := cfg.local[envName(k)]; ok {
f(&cfg.instrumentationConfigValues)
}
})
debugf("central config update: reverted %s to local config", k)
}
if updates != nil {
remote := make(map[string]struct{})
for k := range attrs {
remote[envName(k)] = struct{}{}
}
t.updateInstrumentationConfig(func(cfg *instrumentationConfig) {
cfg.remote = remote
for _, update := range updates {
update(cfg)
}
})
}
}
// instrumentationConfig returns the current instrumentationConfig.
//
// The returned value is immutable.
func (t *Tracer) instrumentationConfig() *instrumentationConfig {
config := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&t.instrumentationConfigInternal)))
return (*instrumentationConfig)(config)
}
// setLocalInstrumentationConfig sets local transaction configuration with
// the specified environment variable key.
func (t *Tracer) setLocalInstrumentationConfig(envKey string, f func(cfg *instrumentationConfigValues)) {
t.updateInstrumentationConfig(func(cfg *instrumentationConfig) {
cfg.local[envKey] = f
if _, ok := cfg.remote[envKey]; !ok {
f(&cfg.instrumentationConfigValues)
}
})
}
func (t *Tracer) updateInstrumentationConfig(f func(cfg *instrumentationConfig)) {
for {
oldConfig := t.instrumentationConfig()
newConfig := *oldConfig
f(&newConfig)
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&t.instrumentationConfigInternal)),
unsafe.Pointer(oldConfig),
unsafe.Pointer(&newConfig),
) {
return
}
}
}
// IgnoredTransactionURL returns whether the given transaction URL should be ignored
func (t *Tracer) IgnoredTransactionURL(url *url.URL) bool {
return t.instrumentationConfig().ignoreTransactionURLs.MatchAny(url.String())
}
// instrumentationConfig holds current configuration values, as well as information
// required to revert from remote to local configuration.
type instrumentationConfig struct {
instrumentationConfigValues
// local holds functions for setting instrumentationConfigValues to the most
// recently, locally specified configuration.
local map[string]func(*instrumentationConfigValues)
// remote holds the environment variable keys for applied remote config.
remote map[string]struct{}
}
// instrumentationConfigValues holds configuration that is accessible outside of the
// tracer loop, for instrumentation: StartTransaction, StartSpan, CaptureError, etc.
//
// NOTE(axw) when adding configuration here, you must also update `newTracer` to
// set the initial entry in instrumentationConfig.local, in order to properly reset
// to the local value, even if the default is the zero value.
type instrumentationConfigValues struct {
recording bool
captureBody CaptureBodyMode
captureHeaders bool
maxSpans int
sampler Sampler
spanStackTraceMinDuration time.Duration
exitSpanMinDuration time.Duration
continuationStrategy string
stackTraceLimit int
propagateLegacyHeader bool
sanitizedFieldNames wildcard.Matchers
ignoreTransactionURLs wildcard.Matchers
compressionOptions compressionOptions
}