func()

in internal/beater/beater.go [813:878]


func (s *Runner) newLibbeatFinalBatchProcessor(
	tracer *apm.Tracer,
	libbeatMonitoringRegistry *monitoring.Registry,
	logger *logp.Logger,
) (modelpb.BatchProcessor, func(context.Context) error, error) {
	// When the publisher stops cleanly it will close its pipeline client,
	// calling the acker's Close method and unblock Wait.
	acker := publish.NewWaitPublishedAcker()
	acker.Open()

	hostname, _ := os.Hostname()
	beatInfo := beat.Info{
		Beat:        "apm-server",
		IndexPrefix: "apm-server",
		Version:     version.VersionWithQualifier(),
		Hostname:    hostname,
		Name:        hostname,
		Logger:      logger,
	}

	stateRegistry := monitoring.GetNamespace("state").GetRegistry()
	stateRegistry.Remove("queue")
	monitors := pipeline.Monitors{
		Metrics:   libbeatMonitoringRegistry,
		Telemetry: stateRegistry,
		Logger:    logger.Named("publisher"),
		Tracer:    tracer,
	}
	outputFactory := func(stats outputs.Observer) (string, outputs.Group, error) {
		if !s.outputConfig.IsSet() {
			return "", outputs.Group{}, nil
		}
		indexSupporter := idxmgmt.NewSupporter(logger, s.rawConfig)
		outputName := s.outputConfig.Name()
		output, err := outputs.Load(indexSupporter, beatInfo, stats, outputName, s.outputConfig.Config())
		return outputName, output, err
	}
	var pipelineConfig pipeline.Config
	if err := s.rawConfig.Unpack(&pipelineConfig); err != nil {
		return nil, nil, fmt.Errorf("failed to unpack libbeat pipeline config: %w", err)
	}
	pipeline, err := pipeline.Load(beatInfo, monitors, pipelineConfig, nopProcessingSupporter{}, outputFactory)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err)
	}
	pipelineConnector := pipetool.WithACKer(pipeline, acker)
	publisher, err := publish.NewPublisher(pipelineConnector, tracer)
	if err != nil {
		return nil, nil, err
	}
	stop := func(ctx context.Context) error {
		// clients need to be closed before running Close so
		// this method needs to be called after the publisher has
		// stopped
		defer pipeline.Close()
		if err := publisher.Stop(ctx); err != nil {
			return err
		}
		if !s.outputConfig.IsSet() {
			// No output defined, so the acker will never be closed.
			return nil
		}
		return acker.Wait(ctx)
	}
	return publisher, stop, nil
}