func()

in internal/beater/beater.go [173:517]


func (s *Runner) Run(ctx context.Context) error {
	defer s.listener.Close()
	g, ctx := errgroup.WithContext(ctx)
	meter := s.meterProvider.Meter("github.com/elastic/apm-server/internal/beater")

	// backgroundContext is a context to use in operations that should
	// block until shutdown, and will be cancelled after the shutdown
	// timeout.
	backgroundContext, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func() {
		<-ctx.Done()
		s.logger.Infof(
			"stopping apm-server... waiting maximum of %s for queues to drain",
			s.config.ShutdownTimeout,
		)
		time.AfterFunc(s.config.ShutdownTimeout, cancel)
	}()

	if s.config.Pprof.Enabled {
		// Profiling rates should be set once, early on in the program.
		runtime.SetBlockProfileRate(s.config.Pprof.BlockProfileRate)
		runtime.SetMutexProfileFraction(s.config.Pprof.MutexProfileRate)
		if s.config.Pprof.MemProfileRate > 0 {
			runtime.MemProfileRate = s.config.Pprof.MemProfileRate
		}
	}

	memLimitGB := processMemoryLimit(
		newCgroupReader(),
		sysMemoryReaderFunc(systemMemoryLimit),
		s.logger,
	)

	if s.config.MaxConcurrentDecoders == 0 {
		s.config.MaxConcurrentDecoders = maxConcurrentDecoders(memLimitGB)
		s.logger.Infof("MaxConcurrentDecoders set to %d based on 80 percent of %0.1fgb of memory",
			s.config.MaxConcurrentDecoders, memLimitGB,
		)
	}

	if s.config.Aggregation.MaxServices <= 0 {
		s.config.Aggregation.MaxServices = linearScaledValue(1_000, memLimitGB, 0)
		s.logger.Infof("Aggregation.MaxServices set to %d based on %0.1fgb of memory",
			s.config.Aggregation.MaxServices, memLimitGB,
		)
	}

	if s.config.Aggregation.ServiceTransactions.MaxGroups <= 0 {
		s.config.Aggregation.ServiceTransactions.MaxGroups = linearScaledValue(1_000, memLimitGB, 0)
		s.logger.Infof("Aggregation.ServiceTransactions.MaxGroups for service aggregation set to %d based on %0.1fgb of memory",
			s.config.Aggregation.ServiceTransactions.MaxGroups, memLimitGB,
		)
	}

	if s.config.Aggregation.Transactions.MaxGroups <= 0 {
		s.config.Aggregation.Transactions.MaxGroups = linearScaledValue(5_000, memLimitGB, 0)
		s.logger.Infof("Aggregation.Transactions.MaxGroups set to %d based on %0.1fgb of memory",
			s.config.Aggregation.Transactions.MaxGroups, memLimitGB,
		)
	}

	if s.config.Aggregation.ServiceDestinations.MaxGroups <= 0 {
		s.config.Aggregation.ServiceDestinations.MaxGroups = linearScaledValue(5_000, memLimitGB, 5_000)
		s.logger.Infof("Aggregation.ServiceDestinations.MaxGroups set to %d based on %0.1fgb of memory",
			s.config.Aggregation.ServiceDestinations.MaxGroups, memLimitGB,
		)
	}

	// Send config to telemetry.
	recordAPMServerConfig(s.config)

	var kibanaClient *kibana.Client
	if s.config.Kibana.Enabled {
		var err error
		kibanaClient, err = kibana.NewClient(s.config.Kibana.ClientConfig)
		if err != nil {
			return err
		}
	}

	// ELASTIC_AGENT_CLOUD is set when running in Elastic Cloud.
	inElasticCloud := os.Getenv("ELASTIC_AGENT_CLOUD") != ""
	if inElasticCloud {
		if s.config.Kibana.Enabled {
			go func() {
				if err := kibana.SendConfig(ctx, kibanaClient, (*ucfg.Config)(s.rawConfig)); err != nil {
					s.logger.Infof("failed to upload config to kibana: %v", err)
				}
			}()
		}
	}

	instrumentation, err := newInstrumentation(s.rawConfig, s.logger)
	if err != nil {
		return err
	}
	tracer := instrumentation.Tracer()
	tracerServerListener := instrumentation.Listener()
	if tracerServerListener != nil {
		defer tracerServerListener.Close()
	}
	defer tracer.Close()

	tracerProvider, err := apmotel.NewTracerProvider(apmotel.WithAPMTracer(tracer))
	if err != nil {
		return err
	}
	otel.SetTracerProvider(tracerProvider)

	s.tracerProvider = tracerProvider

	tracer.RegisterMetricsGatherer(s.metricGatherer)

	// Ensure the libbeat output and go-elasticsearch clients do not index
	// any events to Elasticsearch before the integration is ready.
	publishReady := make(chan struct{})
	drain := make(chan struct{})
	g.Go(func() error {
		if err := s.waitReady(ctx, tracer); err != nil {
			// One or more preconditions failed; drop events.
			close(drain)
			return fmt.Errorf("error waiting for server to be ready: %w", err)
		}
		// All preconditions have been met; start indexing documents
		// into elasticsearch.
		close(publishReady)
		return nil
	})
	callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error {
		select {
		case <-publishReady:
			return nil
		default:
		}
		return errors.New("not ready for publishing events")
	})
	if err != nil {
		return err
	}
	defer esoutput.DeregisterConnectCallback(callbackUUID)
	newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) {
		httpTransport, err := elasticsearch.NewHTTPTransport(cfg)
		if err != nil {
			return nil, err
		}
		transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain}
		return elasticsearch.NewClientParams(elasticsearch.ClientParams{
			Config:    cfg,
			Transport: transport,
			RetryOnError: func(_ *http.Request, err error) bool {
				return !errors.Is(err, errServerShuttingDown)
			},
		})
	}

	var sourcemapFetcher sourcemap.Fetcher
	if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled {
		fetcher, cancel, err := newSourcemapFetcher(
			s.config.RumConfig.SourceMapping,
			kibanaClient, newElasticsearchClient,
			tracer,
			s.logger,
		)
		if err != nil {
			return err
		}
		defer cancel()
		sourcemapFetcher = fetcher
	}

	// Create the runServer function. We start with newBaseRunServer, and then
	// wrap depending on the configuration in order to inject behaviour.
	runServer := newBaseRunServer(s.listener)
	authenticator, err := auth.NewAuthenticator(s.config.AgentAuth)
	if err != nil {
		return err
	}

	ratelimitStore, err := ratelimit.NewStore(
		s.config.AgentAuth.Anonymous.RateLimit.IPLimit,
		s.config.AgentAuth.Anonymous.RateLimit.EventLimit,
		3, // burst multiplier
	)
	if err != nil {
		return err
	}

	// Note that we intentionally do not use a grpc.Creds ServerOption
	// even if TLS is enabled, as TLS is handled by the net/http server.
	gRPCLogger := s.logger.Named("grpc")
	grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor(
		apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)),
		interceptors.ClientMetadata(),
		interceptors.Logging(gRPCLogger),
		interceptors.Metrics(gRPCLogger, s.meterProvider),
		interceptors.Timeout(),
		interceptors.Auth(authenticator),
		interceptors.AnonymousRateLimit(ratelimitStore),
	))

	// Create the BatchProcessor chain that is used to process all events,
	// including the metrics aggregated by APM Server.
	finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor(
		tracer, newElasticsearchClient, memLimitGB, s.logger,
	)
	if err != nil {
		return err
	}
	transactionsDroppedCounter, err := meter.Int64Counter("apm-server.sampling.transactions_dropped")
	if err != nil {
		return err
	}
	batchProcessor := srvmodelprocessor.NewTracer("beater.ProcessBatch", modelprocessor.Chained{
		// Ensure all events have observer.*, ecs.*, and data_stream.* fields added,
		// and are counted in metrics. This is done in the final processors to ensure
		// aggregated metrics are also processed.
		newObserverBatchProcessor(),
		&modelprocessor.SetDataStream{Namespace: s.config.DataStreams.Namespace},
		srvmodelprocessor.NewEventCounter(s.meterProvider),

		// The server always drops non-RUM unsampled transactions. We store RUM unsampled
		// transactions as they are needed by the User Experience app, which performs
		// aggregations over dimensions that are not available in transaction metrics.
		//
		// It is important that this is done just before calling the publisher to
		// avoid affecting aggregations.
		modelprocessor.NewDropUnsampled(false /* don't drop RUM unsampled transactions*/, func(i int64) {
			transactionsDroppedCounter.Add(context.Background(), i)
		}),
		finalBatchProcessor,
	})

	agentConfigFetcher, fetcherRunFunc, err := newAgentConfigFetcher(
		ctx,
		s.config,
		kibanaClient,
		newElasticsearchClient,
		tracer,
		s.meterProvider,
		s.logger,
	)
	if err != nil {
		return err
	}
	if fetcherRunFunc != nil {
		g.Go(func() error {
			return fetcherRunFunc(ctx)
		})
	}

	agentConfigReporter := agentcfg.NewReporter(
		agentConfigFetcher,
		batchProcessor, 30*time.Second,
		s.logger,
	)
	g.Go(func() error {
		return agentConfigReporter.Run(ctx)
	})

	// Create the runServer function. We start with newBaseRunServer, and then
	// wrap depending on the configuration in order to inject behaviour.
	serverParams := ServerParams{
		Config:                 s.config,
		Namespace:              s.config.DataStreams.Namespace,
		Logger:                 s.logger,
		Tracer:                 tracer,
		TracerProvider:         s.tracerProvider,
		MeterProvider:          s.meterProvider,
		Authenticator:          authenticator,
		RateLimitStore:         ratelimitStore,
		BatchProcessor:         batchProcessor,
		AgentConfig:            agentConfigReporter,
		SourcemapFetcher:       sourcemapFetcher,
		PublishReady:           publishReady,
		KibanaClient:           kibanaClient,
		NewElasticsearchClient: newElasticsearchClient,
		GRPCServer:             grpcServer,
		Semaphore:              semaphore.NewWeighted(int64(s.config.MaxConcurrentDecoders)),
	}
	if s.wrapServer != nil {
		// Wrap the serverParams and runServer function, enabling
		// injection of behaviour into the processing chain.
		serverParams, runServer, err = s.wrapServer(serverParams, runServer)
		if err != nil {
			return err
		}
	}

	// Add pre-processing batch processors to the beginning of the chain,
	// applying only to the events that are decoded from agent/client payloads.
	preBatchProcessors := modelprocessor.Chained{
		// Add a model processor that rate limits, and checks authorization for the
		// agent and service for each event. These must come at the beginning of the
		// processor chain.
		modelpb.ProcessBatchFunc(rateLimitBatchProcessor),
		modelpb.ProcessBatchFunc(authorizeEventIngestProcessor),

		// Add a model processor that removes `event.received`, which is added by
		// apm-data, but which we don't yet map.
		modelprocessor.RemoveEventReceived{},

		// Pre-process events before they are sent to the final processors for
		// aggregation, sampling, and indexing.
		modelprocessor.SetHostHostname{},
		modelprocessor.SetServiceNodeName{},
		modelprocessor.SetGroupingKey{
			NewHash: func() hash.Hash {
				return xxhash.New()
			},
		},
		modelprocessor.SetErrorMessage{},
	}
	if s.config.DefaultServiceEnvironment != "" {
		preBatchProcessors = append(preBatchProcessors, &modelprocessor.SetDefaultServiceEnvironment{
			DefaultServiceEnvironment: s.config.DefaultServiceEnvironment,
		})
	}
	serverParams.BatchProcessor = append(preBatchProcessors, serverParams.BatchProcessor)

	// Start the main server and the optional server for self-instrumentation.
	g.Go(func() error {
		return runServer(ctx, serverParams)
	})
	if tracerServerListener != nil {
		tracerServer, err := newTracerServer(s.config, tracerServerListener, s.logger, serverParams.BatchProcessor, serverParams.Semaphore, serverParams.MeterProvider)
		if err != nil {
			return fmt.Errorf("failed to create self-instrumentation server: %w", err)
		}
		g.Go(func() error {
			if err := tracerServer.Serve(tracerServerListener); err != http.ErrServerClosed {
				return err
			}
			return nil
		})
		go func() {
			<-ctx.Done()
			tracerServer.Shutdown(backgroundContext)
		}()
	}

	result := g.Wait()
	closeErr := closeFinalBatchProcessor(backgroundContext)
	return errors.Join(result, closeErr)
}