in internal/beater/beater.go [813:878]
func (s *Runner) newLibbeatFinalBatchProcessor(
tracer *apm.Tracer,
libbeatMonitoringRegistry *monitoring.Registry,
logger *logp.Logger,
) (modelpb.BatchProcessor, func(context.Context) error, error) {
// When the publisher stops cleanly it will close its pipeline client,
// calling the acker's Close method and unblock Wait.
acker := publish.NewWaitPublishedAcker()
acker.Open()
hostname, _ := os.Hostname()
beatInfo := beat.Info{
Beat: "apm-server",
IndexPrefix: "apm-server",
Version: version.VersionWithQualifier(),
Hostname: hostname,
Name: hostname,
Logger: logger,
}
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
stateRegistry.Remove("queue")
monitors := pipeline.Monitors{
Metrics: libbeatMonitoringRegistry,
Telemetry: stateRegistry,
Logger: logger.Named("publisher"),
Tracer: tracer,
}
outputFactory := func(stats outputs.Observer) (string, outputs.Group, error) {
if !s.outputConfig.IsSet() {
return "", outputs.Group{}, nil
}
indexSupporter := idxmgmt.NewSupporter(logger, s.rawConfig)
outputName := s.outputConfig.Name()
output, err := outputs.Load(indexSupporter, beatInfo, stats, outputName, s.outputConfig.Config())
return outputName, output, err
}
var pipelineConfig pipeline.Config
if err := s.rawConfig.Unpack(&pipelineConfig); err != nil {
return nil, nil, fmt.Errorf("failed to unpack libbeat pipeline config: %w", err)
}
pipeline, err := pipeline.Load(beatInfo, monitors, pipelineConfig, nopProcessingSupporter{}, outputFactory)
if err != nil {
return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err)
}
pipelineConnector := pipetool.WithACKer(pipeline, acker)
publisher, err := publish.NewPublisher(pipelineConnector, tracer)
if err != nil {
return nil, nil, err
}
stop := func(ctx context.Context) error {
// clients need to be closed before running Close so
// this method needs to be called after the publisher has
// stopped
defer pipeline.Close()
if err := publisher.Stop(ctx); err != nil {
return err
}
if !s.outputConfig.IsSet() {
// No output defined, so the acker will never be closed.
return nil
}
return acker.Wait(ctx)
}
return publisher, stop, nil
}