in internal/beater/beater.go [173:517]
func (s *Runner) Run(ctx context.Context) error {
defer s.listener.Close()
g, ctx := errgroup.WithContext(ctx)
meter := s.meterProvider.Meter("github.com/elastic/apm-server/internal/beater")
// backgroundContext is a context to use in operations that should
// block until shutdown, and will be cancelled after the shutdown
// timeout.
backgroundContext, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-ctx.Done()
s.logger.Infof(
"stopping apm-server... waiting maximum of %s for queues to drain",
s.config.ShutdownTimeout,
)
time.AfterFunc(s.config.ShutdownTimeout, cancel)
}()
if s.config.Pprof.Enabled {
// Profiling rates should be set once, early on in the program.
runtime.SetBlockProfileRate(s.config.Pprof.BlockProfileRate)
runtime.SetMutexProfileFraction(s.config.Pprof.MutexProfileRate)
if s.config.Pprof.MemProfileRate > 0 {
runtime.MemProfileRate = s.config.Pprof.MemProfileRate
}
}
memLimitGB := processMemoryLimit(
newCgroupReader(),
sysMemoryReaderFunc(systemMemoryLimit),
s.logger,
)
if s.config.MaxConcurrentDecoders == 0 {
s.config.MaxConcurrentDecoders = maxConcurrentDecoders(memLimitGB)
s.logger.Infof("MaxConcurrentDecoders set to %d based on 80 percent of %0.1fgb of memory",
s.config.MaxConcurrentDecoders, memLimitGB,
)
}
if s.config.Aggregation.MaxServices <= 0 {
s.config.Aggregation.MaxServices = linearScaledValue(1_000, memLimitGB, 0)
s.logger.Infof("Aggregation.MaxServices set to %d based on %0.1fgb of memory",
s.config.Aggregation.MaxServices, memLimitGB,
)
}
if s.config.Aggregation.ServiceTransactions.MaxGroups <= 0 {
s.config.Aggregation.ServiceTransactions.MaxGroups = linearScaledValue(1_000, memLimitGB, 0)
s.logger.Infof("Aggregation.ServiceTransactions.MaxGroups for service aggregation set to %d based on %0.1fgb of memory",
s.config.Aggregation.ServiceTransactions.MaxGroups, memLimitGB,
)
}
if s.config.Aggregation.Transactions.MaxGroups <= 0 {
s.config.Aggregation.Transactions.MaxGroups = linearScaledValue(5_000, memLimitGB, 0)
s.logger.Infof("Aggregation.Transactions.MaxGroups set to %d based on %0.1fgb of memory",
s.config.Aggregation.Transactions.MaxGroups, memLimitGB,
)
}
if s.config.Aggregation.ServiceDestinations.MaxGroups <= 0 {
s.config.Aggregation.ServiceDestinations.MaxGroups = linearScaledValue(5_000, memLimitGB, 5_000)
s.logger.Infof("Aggregation.ServiceDestinations.MaxGroups set to %d based on %0.1fgb of memory",
s.config.Aggregation.ServiceDestinations.MaxGroups, memLimitGB,
)
}
// Send config to telemetry.
recordAPMServerConfig(s.config)
var kibanaClient *kibana.Client
if s.config.Kibana.Enabled {
var err error
kibanaClient, err = kibana.NewClient(s.config.Kibana.ClientConfig)
if err != nil {
return err
}
}
// ELASTIC_AGENT_CLOUD is set when running in Elastic Cloud.
inElasticCloud := os.Getenv("ELASTIC_AGENT_CLOUD") != ""
if inElasticCloud {
if s.config.Kibana.Enabled {
go func() {
if err := kibana.SendConfig(ctx, kibanaClient, (*ucfg.Config)(s.rawConfig)); err != nil {
s.logger.Infof("failed to upload config to kibana: %v", err)
}
}()
}
}
instrumentation, err := newInstrumentation(s.rawConfig, s.logger)
if err != nil {
return err
}
tracer := instrumentation.Tracer()
tracerServerListener := instrumentation.Listener()
if tracerServerListener != nil {
defer tracerServerListener.Close()
}
defer tracer.Close()
tracerProvider, err := apmotel.NewTracerProvider(apmotel.WithAPMTracer(tracer))
if err != nil {
return err
}
otel.SetTracerProvider(tracerProvider)
s.tracerProvider = tracerProvider
tracer.RegisterMetricsGatherer(s.metricGatherer)
// Ensure the libbeat output and go-elasticsearch clients do not index
// any events to Elasticsearch before the integration is ready.
publishReady := make(chan struct{})
drain := make(chan struct{})
g.Go(func() error {
if err := s.waitReady(ctx, tracer); err != nil {
// One or more preconditions failed; drop events.
close(drain)
return fmt.Errorf("error waiting for server to be ready: %w", err)
}
// All preconditions have been met; start indexing documents
// into elasticsearch.
close(publishReady)
return nil
})
callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error {
select {
case <-publishReady:
return nil
default:
}
return errors.New("not ready for publishing events")
})
if err != nil {
return err
}
defer esoutput.DeregisterConnectCallback(callbackUUID)
newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) {
httpTransport, err := elasticsearch.NewHTTPTransport(cfg)
if err != nil {
return nil, err
}
transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain}
return elasticsearch.NewClientParams(elasticsearch.ClientParams{
Config: cfg,
Transport: transport,
RetryOnError: func(_ *http.Request, err error) bool {
return !errors.Is(err, errServerShuttingDown)
},
})
}
var sourcemapFetcher sourcemap.Fetcher
if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled {
fetcher, cancel, err := newSourcemapFetcher(
s.config.RumConfig.SourceMapping,
kibanaClient, newElasticsearchClient,
tracer,
s.logger,
)
if err != nil {
return err
}
defer cancel()
sourcemapFetcher = fetcher
}
// Create the runServer function. We start with newBaseRunServer, and then
// wrap depending on the configuration in order to inject behaviour.
runServer := newBaseRunServer(s.listener)
authenticator, err := auth.NewAuthenticator(s.config.AgentAuth)
if err != nil {
return err
}
ratelimitStore, err := ratelimit.NewStore(
s.config.AgentAuth.Anonymous.RateLimit.IPLimit,
s.config.AgentAuth.Anonymous.RateLimit.EventLimit,
3, // burst multiplier
)
if err != nil {
return err
}
// Note that we intentionally do not use a grpc.Creds ServerOption
// even if TLS is enabled, as TLS is handled by the net/http server.
gRPCLogger := s.logger.Named("grpc")
grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(
apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)),
interceptors.ClientMetadata(),
interceptors.Logging(gRPCLogger),
interceptors.Metrics(gRPCLogger, s.meterProvider),
interceptors.Timeout(),
interceptors.Auth(authenticator),
interceptors.AnonymousRateLimit(ratelimitStore),
))
// Create the BatchProcessor chain that is used to process all events,
// including the metrics aggregated by APM Server.
finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor(
tracer, newElasticsearchClient, memLimitGB, s.logger,
)
if err != nil {
return err
}
transactionsDroppedCounter, err := meter.Int64Counter("apm-server.sampling.transactions_dropped")
if err != nil {
return err
}
batchProcessor := srvmodelprocessor.NewTracer("beater.ProcessBatch", modelprocessor.Chained{
// Ensure all events have observer.*, ecs.*, and data_stream.* fields added,
// and are counted in metrics. This is done in the final processors to ensure
// aggregated metrics are also processed.
newObserverBatchProcessor(),
&modelprocessor.SetDataStream{Namespace: s.config.DataStreams.Namespace},
srvmodelprocessor.NewEventCounter(s.meterProvider),
// The server always drops non-RUM unsampled transactions. We store RUM unsampled
// transactions as they are needed by the User Experience app, which performs
// aggregations over dimensions that are not available in transaction metrics.
//
// It is important that this is done just before calling the publisher to
// avoid affecting aggregations.
modelprocessor.NewDropUnsampled(false /* don't drop RUM unsampled transactions*/, func(i int64) {
transactionsDroppedCounter.Add(context.Background(), i)
}),
finalBatchProcessor,
})
agentConfigFetcher, fetcherRunFunc, err := newAgentConfigFetcher(
ctx,
s.config,
kibanaClient,
newElasticsearchClient,
tracer,
s.meterProvider,
s.logger,
)
if err != nil {
return err
}
if fetcherRunFunc != nil {
g.Go(func() error {
return fetcherRunFunc(ctx)
})
}
agentConfigReporter := agentcfg.NewReporter(
agentConfigFetcher,
batchProcessor, 30*time.Second,
s.logger,
)
g.Go(func() error {
return agentConfigReporter.Run(ctx)
})
// Create the runServer function. We start with newBaseRunServer, and then
// wrap depending on the configuration in order to inject behaviour.
serverParams := ServerParams{
Config: s.config,
Namespace: s.config.DataStreams.Namespace,
Logger: s.logger,
Tracer: tracer,
TracerProvider: s.tracerProvider,
MeterProvider: s.meterProvider,
Authenticator: authenticator,
RateLimitStore: ratelimitStore,
BatchProcessor: batchProcessor,
AgentConfig: agentConfigReporter,
SourcemapFetcher: sourcemapFetcher,
PublishReady: publishReady,
KibanaClient: kibanaClient,
NewElasticsearchClient: newElasticsearchClient,
GRPCServer: grpcServer,
Semaphore: semaphore.NewWeighted(int64(s.config.MaxConcurrentDecoders)),
}
if s.wrapServer != nil {
// Wrap the serverParams and runServer function, enabling
// injection of behaviour into the processing chain.
serverParams, runServer, err = s.wrapServer(serverParams, runServer)
if err != nil {
return err
}
}
// Add pre-processing batch processors to the beginning of the chain,
// applying only to the events that are decoded from agent/client payloads.
preBatchProcessors := modelprocessor.Chained{
// Add a model processor that rate limits, and checks authorization for the
// agent and service for each event. These must come at the beginning of the
// processor chain.
modelpb.ProcessBatchFunc(rateLimitBatchProcessor),
modelpb.ProcessBatchFunc(authorizeEventIngestProcessor),
// Add a model processor that removes `event.received`, which is added by
// apm-data, but which we don't yet map.
modelprocessor.RemoveEventReceived{},
// Pre-process events before they are sent to the final processors for
// aggregation, sampling, and indexing.
modelprocessor.SetHostHostname{},
modelprocessor.SetServiceNodeName{},
modelprocessor.SetGroupingKey{
NewHash: func() hash.Hash {
return xxhash.New()
},
},
modelprocessor.SetErrorMessage{},
}
if s.config.DefaultServiceEnvironment != "" {
preBatchProcessors = append(preBatchProcessors, &modelprocessor.SetDefaultServiceEnvironment{
DefaultServiceEnvironment: s.config.DefaultServiceEnvironment,
})
}
serverParams.BatchProcessor = append(preBatchProcessors, serverParams.BatchProcessor)
// Start the main server and the optional server for self-instrumentation.
g.Go(func() error {
return runServer(ctx, serverParams)
})
if tracerServerListener != nil {
tracerServer, err := newTracerServer(s.config, tracerServerListener, s.logger, serverParams.BatchProcessor, serverParams.Semaphore, serverParams.MeterProvider)
if err != nil {
return fmt.Errorf("failed to create self-instrumentation server: %w", err)
}
g.Go(func() error {
if err := tracerServer.Serve(tracerServerListener); err != http.ErrServerClosed {
return err
}
return nil
})
go func() {
<-ctx.Done()
tracerServer.Shutdown(backgroundContext)
}()
}
result := g.Wait()
closeErr := closeFinalBatchProcessor(backgroundContext)
return errors.Join(result, closeErr)
}