internal/beater/beater.go (821 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package beater import ( "context" "encoding/json" "errors" "fmt" "hash" "net" "net/http" "os" "runtime" "strconv" "time" "github.com/cespare/xxhash/v2" "github.com/dustin/go-humanize" "go.elastic.co/apm/module/apmgrpc/v2" "go.elastic.co/apm/module/apmotel/v2" "go.elastic.co/apm/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc" _ "google.golang.org/grpc/encoding/gzip" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/licenser" "github.com/elastic/beats/v7/libbeat/outputs" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/go-docappender/v2" "github.com/elastic/go-ucfg" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-data/model/modelprocessor" "github.com/elastic/apm-server/internal/agentcfg" "github.com/elastic/apm-server/internal/beater/auth" "github.com/elastic/apm-server/internal/beater/config" "github.com/elastic/apm-server/internal/beater/interceptors" "github.com/elastic/apm-server/internal/beater/ratelimit" "github.com/elastic/apm-server/internal/elasticsearch" "github.com/elastic/apm-server/internal/fips140" "github.com/elastic/apm-server/internal/idxmgmt" "github.com/elastic/apm-server/internal/kibana" srvmodelprocessor "github.com/elastic/apm-server/internal/model/modelprocessor" "github.com/elastic/apm-server/internal/publish" "github.com/elastic/apm-server/internal/sourcemap" "github.com/elastic/apm-server/internal/version" ) // Runner initialises and runs and orchestrates the APM Server // HTTP and gRPC servers, event processing pipeline, and output. type Runner struct { wrapServer WrapServerFunc logger *logp.Logger rawConfig *agentconfig.C config *config.Config outputConfig agentconfig.Namespace elasticsearchOutputConfig *agentconfig.C tracerProvider trace.TracerProvider meterProvider metric.MeterProvider metricGatherer *apmotel.Gatherer listener net.Listener } // RunnerParams holds parameters for NewRunner. type RunnerParams struct { // Config holds the full, raw, configuration, including apm-server.* // and output.* attributes. Config *agentconfig.C // Logger holds a logger to use for logging throughout the APM Server. Logger *logp.Logger // TracerProvider holds a trace.TracerProvider that can be used for // creating traces. TracerProvider trace.TracerProvider // MeterProvider holds a metric.MeterProvider that can be used for // creating metrics. MeterProvider metric.MeterProvider // MetricsGatherer holds an apmotel.Gatherer MetricsGatherer *apmotel.Gatherer // WrapServer holds an optional WrapServerFunc, for wrapping the // ServerParams and RunServerFunc used to run the APM Server. // // If WrapServer is nil, no wrapping will occur. WrapServer WrapServerFunc } // NewRunner returns a new Runner that runs APM Server with the given parameters. func NewRunner(args RunnerParams) (*Runner, error) { fips140.CheckFips() var unpackedConfig struct { APMServer *agentconfig.C `config:"apm-server"` Output agentconfig.Namespace `config:"output"` DataStream struct { Namespace string `config:"namespace"` } `config:"data_stream"` } if err := args.Config.Unpack(&unpackedConfig); err != nil { return nil, err } var elasticsearchOutputConfig *agentconfig.C if unpackedConfig.Output.Name() == "elasticsearch" { elasticsearchOutputConfig = unpackedConfig.Output.Config() } cfg, err := config.NewConfig(unpackedConfig.APMServer, elasticsearchOutputConfig, args.Logger) if err != nil { return nil, err } if unpackedConfig.DataStream.Namespace != "" { cfg.DataStreams.Namespace = unpackedConfig.DataStream.Namespace } // We start the listener in the constructor, before Run is invoked, // to ensure zero downtime while any existing Runner is stopped. logger := args.Logger.Named("beater") listener, err := listen(cfg, logger) if err != nil { return nil, err } return &Runner{ wrapServer: args.WrapServer, logger: logger, rawConfig: args.Config, config: cfg, outputConfig: unpackedConfig.Output, elasticsearchOutputConfig: elasticsearchOutputConfig, tracerProvider: args.TracerProvider, meterProvider: args.MeterProvider, metricGatherer: args.MetricsGatherer, listener: listener, }, nil } // Run runs the server, blocking until ctx is cancelled. func (s *Runner) Run(ctx context.Context) error { defer s.listener.Close() g, ctx := errgroup.WithContext(ctx) meter := s.meterProvider.Meter("github.com/elastic/apm-server/internal/beater") // backgroundContext is a context to use in operations that should // block until shutdown, and will be cancelled after the shutdown // timeout. backgroundContext, cancel := context.WithCancel(context.Background()) defer cancel() go func() { <-ctx.Done() s.logger.Infof( "stopping apm-server... waiting maximum of %s for queues to drain", s.config.ShutdownTimeout, ) time.AfterFunc(s.config.ShutdownTimeout, cancel) }() if s.config.Pprof.Enabled { // Profiling rates should be set once, early on in the program. runtime.SetBlockProfileRate(s.config.Pprof.BlockProfileRate) runtime.SetMutexProfileFraction(s.config.Pprof.MutexProfileRate) if s.config.Pprof.MemProfileRate > 0 { runtime.MemProfileRate = s.config.Pprof.MemProfileRate } } memLimitGB := processMemoryLimit( newCgroupReader(), sysMemoryReaderFunc(systemMemoryLimit), s.logger, ) if s.config.MaxConcurrentDecoders == 0 { s.config.MaxConcurrentDecoders = maxConcurrentDecoders(memLimitGB) s.logger.Infof("MaxConcurrentDecoders set to %d based on 80 percent of %0.1fgb of memory", s.config.MaxConcurrentDecoders, memLimitGB, ) } if s.config.Aggregation.MaxServices <= 0 { s.config.Aggregation.MaxServices = linearScaledValue(1_000, memLimitGB, 0) s.logger.Infof("Aggregation.MaxServices set to %d based on %0.1fgb of memory", s.config.Aggregation.MaxServices, memLimitGB, ) } if s.config.Aggregation.ServiceTransactions.MaxGroups <= 0 { s.config.Aggregation.ServiceTransactions.MaxGroups = linearScaledValue(1_000, memLimitGB, 0) s.logger.Infof("Aggregation.ServiceTransactions.MaxGroups for service aggregation set to %d based on %0.1fgb of memory", s.config.Aggregation.ServiceTransactions.MaxGroups, memLimitGB, ) } if s.config.Aggregation.Transactions.MaxGroups <= 0 { s.config.Aggregation.Transactions.MaxGroups = linearScaledValue(5_000, memLimitGB, 0) s.logger.Infof("Aggregation.Transactions.MaxGroups set to %d based on %0.1fgb of memory", s.config.Aggregation.Transactions.MaxGroups, memLimitGB, ) } if s.config.Aggregation.ServiceDestinations.MaxGroups <= 0 { s.config.Aggregation.ServiceDestinations.MaxGroups = linearScaledValue(5_000, memLimitGB, 5_000) s.logger.Infof("Aggregation.ServiceDestinations.MaxGroups set to %d based on %0.1fgb of memory", s.config.Aggregation.ServiceDestinations.MaxGroups, memLimitGB, ) } // Send config to telemetry. recordAPMServerConfig(s.config) var kibanaClient *kibana.Client if s.config.Kibana.Enabled { var err error kibanaClient, err = kibana.NewClient(s.config.Kibana.ClientConfig) if err != nil { return err } } // ELASTIC_AGENT_CLOUD is set when running in Elastic Cloud. inElasticCloud := os.Getenv("ELASTIC_AGENT_CLOUD") != "" if inElasticCloud { if s.config.Kibana.Enabled { go func() { if err := kibana.SendConfig(ctx, kibanaClient, (*ucfg.Config)(s.rawConfig)); err != nil { s.logger.Infof("failed to upload config to kibana: %v", err) } }() } } instrumentation, err := newInstrumentation(s.rawConfig, s.logger) if err != nil { return err } tracer := instrumentation.Tracer() tracerServerListener := instrumentation.Listener() if tracerServerListener != nil { defer tracerServerListener.Close() } defer tracer.Close() tracerProvider, err := apmotel.NewTracerProvider(apmotel.WithAPMTracer(tracer)) if err != nil { return err } otel.SetTracerProvider(tracerProvider) s.tracerProvider = tracerProvider tracer.RegisterMetricsGatherer(s.metricGatherer) // Ensure the libbeat output and go-elasticsearch clients do not index // any events to Elasticsearch before the integration is ready. publishReady := make(chan struct{}) drain := make(chan struct{}) g.Go(func() error { if err := s.waitReady(ctx, tracer); err != nil { // One or more preconditions failed; drop events. close(drain) return fmt.Errorf("error waiting for server to be ready: %w", err) } // All preconditions have been met; start indexing documents // into elasticsearch. close(publishReady) return nil }) callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error { select { case <-publishReady: return nil default: } return errors.New("not ready for publishing events") }) if err != nil { return err } defer esoutput.DeregisterConnectCallback(callbackUUID) newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) { httpTransport, err := elasticsearch.NewHTTPTransport(cfg) if err != nil { return nil, err } transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain} return elasticsearch.NewClientParams(elasticsearch.ClientParams{ Config: cfg, Transport: transport, RetryOnError: func(_ *http.Request, err error) bool { return !errors.Is(err, errServerShuttingDown) }, }) } var sourcemapFetcher sourcemap.Fetcher if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled { fetcher, cancel, err := newSourcemapFetcher( s.config.RumConfig.SourceMapping, kibanaClient, newElasticsearchClient, tracer, s.logger, ) if err != nil { return err } defer cancel() sourcemapFetcher = fetcher } // Create the runServer function. We start with newBaseRunServer, and then // wrap depending on the configuration in order to inject behaviour. runServer := newBaseRunServer(s.listener) authenticator, err := auth.NewAuthenticator(s.config.AgentAuth) if err != nil { return err } ratelimitStore, err := ratelimit.NewStore( s.config.AgentAuth.Anonymous.RateLimit.IPLimit, s.config.AgentAuth.Anonymous.RateLimit.EventLimit, 3, // burst multiplier ) if err != nil { return err } // Note that we intentionally do not use a grpc.Creds ServerOption // even if TLS is enabled, as TLS is handled by the net/http server. gRPCLogger := s.logger.Named("grpc") grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor( apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)), interceptors.ClientMetadata(), interceptors.Logging(gRPCLogger), interceptors.Metrics(gRPCLogger, s.meterProvider), interceptors.Timeout(), interceptors.Auth(authenticator), interceptors.AnonymousRateLimit(ratelimitStore), )) // Create the BatchProcessor chain that is used to process all events, // including the metrics aggregated by APM Server. finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor( tracer, newElasticsearchClient, memLimitGB, s.logger, ) if err != nil { return err } transactionsDroppedCounter, err := meter.Int64Counter("apm-server.sampling.transactions_dropped") if err != nil { return err } batchProcessor := srvmodelprocessor.NewTracer("beater.ProcessBatch", modelprocessor.Chained{ // Ensure all events have observer.*, ecs.*, and data_stream.* fields added, // and are counted in metrics. This is done in the final processors to ensure // aggregated metrics are also processed. newObserverBatchProcessor(), &modelprocessor.SetDataStream{Namespace: s.config.DataStreams.Namespace}, srvmodelprocessor.NewEventCounter(s.meterProvider), // The server always drops non-RUM unsampled transactions. We store RUM unsampled // transactions as they are needed by the User Experience app, which performs // aggregations over dimensions that are not available in transaction metrics. // // It is important that this is done just before calling the publisher to // avoid affecting aggregations. modelprocessor.NewDropUnsampled(false /* don't drop RUM unsampled transactions*/, func(i int64) { transactionsDroppedCounter.Add(context.Background(), i) }), finalBatchProcessor, }) agentConfigFetcher, fetcherRunFunc, err := newAgentConfigFetcher( ctx, s.config, kibanaClient, newElasticsearchClient, tracer, s.meterProvider, s.logger, ) if err != nil { return err } if fetcherRunFunc != nil { g.Go(func() error { return fetcherRunFunc(ctx) }) } agentConfigReporter := agentcfg.NewReporter( agentConfigFetcher, batchProcessor, 30*time.Second, s.logger, ) g.Go(func() error { return agentConfigReporter.Run(ctx) }) // Create the runServer function. We start with newBaseRunServer, and then // wrap depending on the configuration in order to inject behaviour. serverParams := ServerParams{ Config: s.config, Namespace: s.config.DataStreams.Namespace, Logger: s.logger, Tracer: tracer, TracerProvider: s.tracerProvider, MeterProvider: s.meterProvider, Authenticator: authenticator, RateLimitStore: ratelimitStore, BatchProcessor: batchProcessor, AgentConfig: agentConfigReporter, SourcemapFetcher: sourcemapFetcher, PublishReady: publishReady, KibanaClient: kibanaClient, NewElasticsearchClient: newElasticsearchClient, GRPCServer: grpcServer, Semaphore: semaphore.NewWeighted(int64(s.config.MaxConcurrentDecoders)), } if s.wrapServer != nil { // Wrap the serverParams and runServer function, enabling // injection of behaviour into the processing chain. serverParams, runServer, err = s.wrapServer(serverParams, runServer) if err != nil { return err } } // Add pre-processing batch processors to the beginning of the chain, // applying only to the events that are decoded from agent/client payloads. preBatchProcessors := modelprocessor.Chained{ // Add a model processor that rate limits, and checks authorization for the // agent and service for each event. These must come at the beginning of the // processor chain. modelpb.ProcessBatchFunc(rateLimitBatchProcessor), modelpb.ProcessBatchFunc(authorizeEventIngestProcessor), // Add a model processor that removes `event.received`, which is added by // apm-data, but which we don't yet map. modelprocessor.RemoveEventReceived{}, // Pre-process events before they are sent to the final processors for // aggregation, sampling, and indexing. modelprocessor.SetHostHostname{}, modelprocessor.SetServiceNodeName{}, modelprocessor.SetGroupingKey{ NewHash: func() hash.Hash { return xxhash.New() }, }, modelprocessor.SetErrorMessage{}, } if s.config.DefaultServiceEnvironment != "" { preBatchProcessors = append(preBatchProcessors, &modelprocessor.SetDefaultServiceEnvironment{ DefaultServiceEnvironment: s.config.DefaultServiceEnvironment, }) } serverParams.BatchProcessor = append(preBatchProcessors, serverParams.BatchProcessor) // Start the main server and the optional server for self-instrumentation. g.Go(func() error { return runServer(ctx, serverParams) }) if tracerServerListener != nil { tracerServer, err := newTracerServer(s.config, tracerServerListener, s.logger, serverParams.BatchProcessor, serverParams.Semaphore, serverParams.MeterProvider) if err != nil { return fmt.Errorf("failed to create self-instrumentation server: %w", err) } g.Go(func() error { if err := tracerServer.Serve(tracerServerListener); err != http.ErrServerClosed { return err } return nil }) go func() { <-ctx.Done() tracerServer.Shutdown(backgroundContext) }() } result := g.Wait() closeErr := closeFinalBatchProcessor(backgroundContext) return errors.Join(result, closeErr) } // newInstrumentation is a thin wrapper around libbeat instrumentation that // sets missing tracer configuration from elastic agent. func newInstrumentation(rawConfig *agentconfig.C, logger *logp.Logger) (instrumentation.Instrumentation, error) { // This config struct contains missing fields from elastic agent APMConfig // https://github.com/elastic/elastic-agent/blob/main/internal/pkg/core/monitoring/config/config.go#L127 // that are not directly handled by libbeat instrumentation below. // // Note that original config keys were additionally marshalled by // https://github.com/elastic/elastic-agent/blob/main/pkg/component/runtime/apm_config_mapper.go#L18 // that's why some keys are different from the original APMConfig struct including "api_key" and "secret_token". var apmCfg struct { APIKey string `config:"apikey"` SecretToken string `config:"secrettoken"` GlobalLabels string `config:"globallabels"` TLS struct { SkipVerify bool `config:"skipverify"` ServerCertificate string `config:"servercert"` ServerCA string `config:"serverca"` } `config:"tls"` SamplingRate *float32 `config:"samplingrate"` } cfg, err := rawConfig.Child("instrumentation", -1) if err != nil || !cfg.Enabled() { // Fallback to instrumentation.New if the configs are not present or disabled. return instrumentation.New(rawConfig, "apm-server", version.VersionWithQualifier(), logger) } if err := cfg.Unpack(&apmCfg); err != nil { return nil, err } const ( envAPIKey = "ELASTIC_APM_API_KEY" envSecretToken = "ELASTIC_APM_SECRET_TOKEN" envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT" envServerCert = "ELASTIC_APM_SERVER_CERT" envCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE" envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS" envSamplingRate = "ELASTIC_APM_TRANSACTION_SAMPLE_RATE" ) if apmCfg.APIKey != "" { os.Setenv(envAPIKey, apmCfg.APIKey) defer os.Unsetenv(envAPIKey) } if apmCfg.SecretToken != "" { os.Setenv(envSecretToken, apmCfg.SecretToken) defer os.Unsetenv(envSecretToken) } if apmCfg.TLS.SkipVerify { os.Setenv(envVerifyServerCert, "false") defer os.Unsetenv(envVerifyServerCert) } if apmCfg.TLS.ServerCertificate != "" { os.Setenv(envServerCert, apmCfg.TLS.ServerCertificate) defer os.Unsetenv(envServerCert) } if apmCfg.TLS.ServerCA != "" { os.Setenv(envCACert, apmCfg.TLS.ServerCA) defer os.Unsetenv(envCACert) } if len(apmCfg.GlobalLabels) > 0 { os.Setenv(envGlobalLabels, apmCfg.GlobalLabels) defer os.Unsetenv(envGlobalLabels) } if apmCfg.SamplingRate != nil { r := max(min(*apmCfg.SamplingRate, 1.0), 0.0) os.Setenv(envSamplingRate, strconv.FormatFloat(float64(r), 'f', -1, 32)) defer os.Unsetenv(envSamplingRate) } return instrumentation.New(rawConfig, "apm-server", version.VersionWithQualifier(), logger) } func maxConcurrentDecoders(memLimitGB float64) uint { // Allow 128 concurrent decoders for each 1GB memory, limited to at most 2048. const max = 2048 // Use 80% of the total memory limit to calculate decoders decoders := uint(128 * memLimitGB * 0.8) if decoders > max { return max } return decoders } // linearScaledValue calculates linearly scaled value based on memory limit using // the formula y = (perGBIncrement * memLimitGB) + constant func linearScaledValue(perGBIncrement, memLimitGB, constant float64) int { const maxMemGB = 64 if memLimitGB > maxMemGB { memLimitGB = maxMemGB } return int(memLimitGB*perGBIncrement + constant) } // waitReady waits until the server is ready to index events. func (s *Runner) waitReady( ctx context.Context, tracer *apm.Tracer, ) error { var preconditions []func(context.Context) error var esOutputClient *elasticsearch.Client if s.elasticsearchOutputConfig != nil { esConfig := elasticsearch.DefaultConfig() err := s.elasticsearchOutputConfig.Unpack(&esConfig) if err != nil { return err } esOutputClient, err = elasticsearch.NewClient(esConfig) if err != nil { return err } } // libbeat and go-elasticsearch both ensure a minimum level of Basic. // // If any configured features require a higher license level, add a // precondition which checks this. if esOutputClient != nil { requiredLicenseLevel := licenser.Basic licensedFeature := "" if s.config.Sampling.Tail.Enabled { requiredLicenseLevel = licenser.Platinum licensedFeature = "tail-based sampling" } if requiredLicenseLevel > licenser.Basic { preconditions = append(preconditions, func(ctx context.Context) error { license, err := getElasticsearchLicense(ctx, esOutputClient) if err != nil { return fmt.Errorf("error getting Elasticsearch licensing information: %w", err) } if licenser.IsExpired(license) { return errors.New("the Elasticsearch license is expired") } if license.Type == licenser.Trial || license.Cover(requiredLicenseLevel) { return nil } return fmt.Errorf( "invalid license level %s: %s requires license level %s", license.Type, licensedFeature, requiredLicenseLevel, ) }) } preconditions = append(preconditions, func(ctx context.Context) error { return queryClusterUUID(ctx, esOutputClient) }) } if len(preconditions) == 0 { return nil } check := func(ctx context.Context) error { for _, pre := range preconditions { if err := pre(ctx); err != nil { return err } } return nil } return waitReady(ctx, s.config.WaitReadyInterval, tracer, s.logger, check) } // newFinalBatchProcessor returns the final model.BatchProcessor that publishes events, // and a cleanup function which should be called on server shutdown. If the output is // "elasticsearch", then we use docappender; otherwise we use the libbeat publisher. func (s *Runner) newFinalBatchProcessor( tracer *apm.Tracer, newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error), memLimit float64, logger *logp.Logger, ) (modelpb.BatchProcessor, func(context.Context) error, error) { if s.elasticsearchOutputConfig == nil { monitoring.Default.Remove("libbeat") libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry, logger) } stateRegistry := monitoring.GetNamespace("state").GetRegistry() outputRegistry := stateRegistry.GetRegistry("output") if outputRegistry != nil { outputRegistry.Clear() } else { outputRegistry = stateRegistry.NewRegistry("output") } monitoring.NewString(outputRegistry, "name").Set("elasticsearch") // Create the docappender and Elasticsearch config appenderCfg, esCfg, err := s.newDocappenderConfig(s.tracerProvider, s.meterProvider, memLimit) if err != nil { return nil, nil, err } client, err := newElasticsearchClient(esCfg) if err != nil { return nil, nil, err } appender, err := docappender.New(client, appenderCfg) if err != nil { return nil, nil, err } return newDocappenderBatchProcessor(appender), appender.Close, nil } func (s *Runner) newDocappenderConfig(tp trace.TracerProvider, mp metric.MeterProvider, memLimit float64) ( docappender.Config, *elasticsearch.Config, error, ) { esConfig := struct { *elasticsearch.Config `config:",inline"` FlushBytes string `config:"flush_bytes"` FlushInterval time.Duration `config:"flush_interval"` MaxRequests int `config:"max_requests"` Scaling struct { Enabled *bool `config:"enabled"` } `config:"autoscaling"` }{ // Default to 1mib flushes, which is the default for go-docappender. FlushBytes: "1 mib", FlushInterval: time.Second, Config: elasticsearch.DefaultConfig(), } esConfig.MaxIdleConnsPerHost = 10 if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil { return docappender.Config{}, nil, err } var flushBytes int if esConfig.FlushBytes != "" { b, err := humanize.ParseBytes(esConfig.FlushBytes) if err != nil { return docappender.Config{}, nil, fmt.Errorf("failed to parse flush_bytes: %w", err) } flushBytes = int(b) } minFlush := 24 * 1024 if esConfig.CompressionLevel != 0 && flushBytes < minFlush { s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush) flushBytes = minFlush } var scalingCfg docappender.ScalingConfig if enabled := esConfig.Scaling.Enabled; enabled != nil { scalingCfg.Disabled = !*enabled } cfg := docappenderConfig(docappender.Config{ CompressionLevel: esConfig.CompressionLevel, FlushBytes: flushBytes, FlushInterval: esConfig.FlushInterval, TracerProvider: tp, MeterProvider: mp, MaxRequests: esConfig.MaxRequests, Scaling: scalingCfg, Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), RequireDataStream: true, IncludeSourceOnError: docappender.False, // Use the output's max_retries to configure the go-docappender's // document level retries. MaxDocumentRetries: esConfig.MaxRetries, RetryOnDocumentStatus: []int{429}, // Only retry "safe" 429 responses. }, memLimit, s.logger) if cfg.MaxRequests != 0 { esConfig.MaxIdleConnsPerHost = cfg.MaxRequests } return cfg, esConfig.Config, nil } func docappenderConfig( opts docappender.Config, memLimit float64, logger *logp.Logger, ) docappender.Config { const logMessage = "%s set to %d based on %0.1fgb of memory" // Use 80% of the total memory limit to calculate buffer size opts.DocumentBufferSize = int(1024 * memLimit * 0.8) if opts.DocumentBufferSize >= 61440 { opts.DocumentBufferSize = 61440 } logger.Infof(logMessage, "docappender.DocumentBufferSize", opts.DocumentBufferSize, memLimit, ) if opts.MaxRequests > 0 { logger.Infof("docappender.MaxRequests set to %d based on config value", opts.MaxRequests, ) return opts } // This formula yields the following max requests for APM Server sized: // 1 2 4 8 15 30 // 10 12 14 19 28 46 maxRequests := int(float64(10) + memLimit*1.5) if maxRequests > 60 { maxRequests = 60 } opts.MaxRequests = maxRequests logger.Infof(logMessage, "docappender.MaxRequests", opts.MaxRequests, memLimit, ) return opts } func (s *Runner) newLibbeatFinalBatchProcessor( tracer *apm.Tracer, libbeatMonitoringRegistry *monitoring.Registry, logger *logp.Logger, ) (modelpb.BatchProcessor, func(context.Context) error, error) { // When the publisher stops cleanly it will close its pipeline client, // calling the acker's Close method and unblock Wait. acker := publish.NewWaitPublishedAcker() acker.Open() hostname, _ := os.Hostname() beatInfo := beat.Info{ Beat: "apm-server", IndexPrefix: "apm-server", Version: version.VersionWithQualifier(), Hostname: hostname, Name: hostname, Logger: logger, } stateRegistry := monitoring.GetNamespace("state").GetRegistry() stateRegistry.Remove("queue") monitors := pipeline.Monitors{ Metrics: libbeatMonitoringRegistry, Telemetry: stateRegistry, Logger: logger.Named("publisher"), Tracer: tracer, } outputFactory := func(stats outputs.Observer) (string, outputs.Group, error) { if !s.outputConfig.IsSet() { return "", outputs.Group{}, nil } indexSupporter := idxmgmt.NewSupporter(logger, s.rawConfig) outputName := s.outputConfig.Name() output, err := outputs.Load(indexSupporter, beatInfo, stats, outputName, s.outputConfig.Config()) return outputName, output, err } var pipelineConfig pipeline.Config if err := s.rawConfig.Unpack(&pipelineConfig); err != nil { return nil, nil, fmt.Errorf("failed to unpack libbeat pipeline config: %w", err) } pipeline, err := pipeline.Load(beatInfo, monitors, pipelineConfig, nopProcessingSupporter{}, outputFactory) if err != nil { return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err) } pipelineConnector := pipetool.WithACKer(pipeline, acker) publisher, err := publish.NewPublisher(pipelineConnector, tracer) if err != nil { return nil, nil, err } stop := func(ctx context.Context) error { // clients need to be closed before running Close so // this method needs to be called after the publisher has // stopped defer pipeline.Close() if err := publisher.Stop(ctx); err != nil { return err } if !s.outputConfig.IsSet() { // No output defined, so the acker will never be closed. return nil } return acker.Wait(ctx) } return publisher, stop, nil } const sourcemapIndex = ".apm-source-map" func newSourcemapFetcher( cfg config.SourceMapping, kibanaClient *kibana.Client, newElasticsearchClient func(*elasticsearch.Config) (*elasticsearch.Client, error), tracer *apm.Tracer, logger *logp.Logger, ) (sourcemap.Fetcher, context.CancelFunc, error) { esClient, err := newElasticsearchClient(cfg.ESConfig) if err != nil { return nil, nil, err } var fetchers []sourcemap.Fetcher // start background sync job ctx, ctxCancel := context.WithCancel(context.Background()) metadataFetcher, invalidationChan := sourcemap.NewMetadataFetcher(ctx, esClient, sourcemapIndex, tracer, logger) cancel := func() { ctxCancel() <-invalidationChan } esFetcher := sourcemap.NewElasticsearchFetcher(esClient, sourcemapIndex, logger) size := 128 cachingFetcher, err := sourcemap.NewBodyCachingFetcher(esFetcher, size, invalidationChan, logger) if err != nil { cancel() return nil, nil, err } sourcemapFetcher := sourcemap.NewSourcemapFetcher(metadataFetcher, cachingFetcher, logger) fetchers = append(fetchers, sourcemapFetcher) if kibanaClient != nil { fetchers = append(fetchers, sourcemap.NewKibanaFetcher(kibanaClient, logger)) } chained := sourcemap.NewChainedFetcher(fetchers, logger) return chained, cancel, nil } // TODO: This is copying behavior from libbeat: // https://github.com/elastic/beats/blob/b9ced47dba8bb55faa3b2b834fd6529d3c4d0919/libbeat/cmd/instance/beat.go#L927-L950 // Remove this when cluster_uuid no longer needs to be queried from ES. func queryClusterUUID(ctx context.Context, esClient *elasticsearch.Client) error { stateRegistry := monitoring.GetNamespace("state").GetRegistry() outputES := "outputs.elasticsearch" // Running under elastic-agent, the callback linked above is not // registered until later, meaning we need to check and instantiate the // registries if they don't exist. elasticsearchRegistry := stateRegistry.GetRegistry(outputES) if elasticsearchRegistry == nil { elasticsearchRegistry = stateRegistry.NewRegistry(outputES) } var ( s *monitoring.String ok bool ) clusterUUID := "cluster_uuid" clusterUUIDRegVar := elasticsearchRegistry.Get(clusterUUID) if clusterUUIDRegVar != nil { s, ok = clusterUUIDRegVar.(*monitoring.String) if !ok { return fmt.Errorf("couldn't cast to String") } } else { s = monitoring.NewString(elasticsearchRegistry, clusterUUID) } var response struct { ClusterUUID string `json:"cluster_uuid"` } req, err := http.NewRequest("GET", "/", nil) if err != nil { return err } resp, err := esClient.Perform(req.WithContext(ctx)) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode > 299 { return fmt.Errorf("error querying cluster_uuid: status_code=%d", resp.StatusCode) } err = json.NewDecoder(resp.Body).Decode(&response) if err != nil { return err } s.Set(response.ClusterUUID) return nil } // processMemoryLimit obtains the memory limit for the APM Server process. Certain config // values will be sized according to the maximum memory set for the server. func processMemoryLimit(cgroups cgroupReader, sys sysMemoryReader, logger *logp.Logger) (memLimitGB float64) { var memLimit uint64 if cgroups != nil { if limit, err := cgroupMemoryLimit(cgroups); err != nil { logger.Warn(err) } else { memLimit = limit } } if limit, err := sys.Limit(); err != nil { logger.Warn(err) } else { var fallback bool if memLimit <= 0 { logger.Info("no cgroups detected, falling back to total system memory") fallback = true } if memLimit > limit { logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory") fallback = true } if fallback { // If no cgroup limit is set, return a fraction of the total memory // to have a margin of safety for other processes. The fraction value // of 0.625 is used to keep the 80% of the total system memory limit // to be 50% of the total for calculating the number of decoders. memLimit = uint64(float64(limit) * 0.625) } } // Convert the memory limit to gigabytes to calculate the config values. memLimitGB = float64(memLimit) / (1 << 30) if memLimitGB <= 0 { memLimitGB = 1 logger.Infof( "failed to discover memory limit, default to %0.1fgb of memory", memLimitGB, ) } return } type nopProcessingSupporter struct { } func (nopProcessingSupporter) Close() error { return nil } func (nopProcessingSupporter) Processors() []string { return nil } func (nopProcessingSupporter) Create(cfg beat.ProcessingConfig, _ bool) (beat.Processor, error) { return cfg.Processor, nil }