libbeat/otelbeat/beatreceiver/beat_receiver.go (66 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package beatreceiver
import (
"fmt"
"github.com/elastic/beats/v7/libbeat/api"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
metricreport "github.com/elastic/elastic-agent-system-metrics/report"
"go.uber.org/zap"
)
// BaseReceiver holds common configurations for beatreceivers.
type BeatReceiver struct {
HttpConf *config.C
Beat *instance.Beat
Beater beat.Beater
Logger *zap.Logger
}
// BeatReceiver.Stop() starts the beat receiver.
func (b *BeatReceiver) Start() error {
if err := b.startMonitoring(); err != nil {
return fmt.Errorf("could not start the HTTP server for the monitoring API: %w", err)
}
if err := b.Beater.Run(&b.Beat.Beat); err != nil {
return fmt.Errorf("beat receiver run error: %w", err)
}
return nil
}
// BeatReceiver.Stop() stops beat receiver.
func (b *BeatReceiver) Shutdown() error {
b.Beater.Stop()
if err := b.stopMonitoring(); err != nil {
return fmt.Errorf("error stopping monitoring server: %w", err)
}
return nil
}
func (b *BeatReceiver) startMonitoring() error {
if b.HttpConf == nil || !b.HttpConf.Enabled() {
return nil
}
var err error
b.Beat.RegisterMetrics()
statsReg := b.Beat.Info.Monitoring.StatsRegistry
// stats.beat
processReg := statsReg.GetRegistry("beat")
if processReg == nil {
processReg = statsReg.NewRegistry("beat")
}
// stats.system
systemReg := statsReg.GetRegistry("system")
if systemReg == nil {
systemReg = statsReg.NewRegistry("system")
}
err = metricreport.SetupMetrics(logp.NewLogger("metrics"), b.Beat.Info.Beat, version.GetDefaultVersion(), metricreport.WithProcessRegistry(processReg), metricreport.WithSystemRegistry(systemReg))
if err != nil {
return err
}
b.Beat.API, err = api.NewWithDefaultRoutes(logp.NewLogger("metrics.http"), b.HttpConf, api.RegistryLookupFunc(b.Beat.Info.Monitoring.Namespace))
if err != nil {
return err
}
b.Beat.API.Start()
return nil
}
func (b *BeatReceiver) stopMonitoring() error {
if b.Beat.API != nil {
return b.Beat.API.Stop()
}
return nil
}