winlogbeat/beater/winlogbeat.go (157 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/*
Package beater provides the implementation of the libbeat Beater interface for
Winlogbeat. The main event loop is implemented in this package.
*/
package beater
import (
"context"
"fmt"
"sync"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/winlogbeat/module"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
"github.com/elastic/beats/v7/winlogbeat/checkpoint"
"github.com/elastic/beats/v7/winlogbeat/config"
"github.com/elastic/beats/v7/winlogbeat/eventlog"
)
const pipelinesWarning = "Winlogbeat is unable to load the ingest pipelines" +
" because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the ingest pipelines, you can ignore this warning."
// Winlogbeat is used to conform to the beat interface
type Winlogbeat struct {
beat *beat.Beat // Common beat information.
config config.WinlogbeatConfig // Configuration settings.
eventLogs []*eventLogger // List of all event logs being monitored.
done chan struct{} // Channel to initiate shutdown of main event loop.
pipeline beat.Pipeline // Interface to publish event.
checkpoint *checkpoint.Checkpoint // Persists event log state to disk.
log *logp.Logger
}
// New returns a new Winlogbeat.
func New(b *beat.Beat, _ *conf.C) (beat.Beater, error) {
// Read configuration.
config := config.DefaultSettings
if err := b.BeatConfig.Unpack(&config); err != nil {
return nil, fmt.Errorf("error reading configuration file: %w", err)
}
log := logp.NewLogger("winlogbeat")
// resolve registry file path
config.RegistryFile = paths.Resolve(paths.Data, config.RegistryFile)
log.Infof("State will be read from and persisted to %s",
config.RegistryFile)
eb := &Winlogbeat{
beat: b,
config: config,
done: make(chan struct{}),
log: log,
}
if err := eb.init(b); err != nil {
return nil, err
}
return eb, nil
}
func (eb *Winlogbeat) init(b *beat.Beat) error {
config := &eb.config
if !eb.beat.InSetupCmd {
// Create the event logs. This will validate the event log specific
// configuration.
eb.eventLogs = make([]*eventLogger, 0, len(config.EventLogs))
for _, config := range config.EventLogs {
eventLog, err := eventlog.New(config)
if err != nil {
return fmt.Errorf("failed to create new event log: %w", err)
}
eb.log.Debugf("initialized WinEventLog[%s]", eventLog.Name())
logger, err := newEventLogger(b.Info, eventLog, config, eb.log)
if err != nil {
return fmt.Errorf("failed to create new event log: %w", err)
}
eb.eventLogs = append(eb.eventLogs, logger)
}
}
b.OverwritePipelinesCallback = func(esConfig *conf.C) error {
overwritePipelines := config.OverwritePipelines
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Winlogbeat")
if err != nil {
return err
}
_, err = module.UploadPipelines(b.Info, esClient, overwritePipelines)
return err
}
return nil
}
// Setup uses the loaded config and creates necessary markers and environment
// settings to allow the beat to be used.
func (eb *Winlogbeat) setup(b *beat.Beat) error {
config := &eb.config
var err error
eb.checkpoint, err = checkpoint.NewCheckpoint(config.RegistryFile, config.RegistryFlush)
if err != nil {
return fmt.Errorf("failed to initialize checkpoint registry: %w", err)
}
eb.pipeline = b.Publisher
return nil
}
// Run is used within the beats interface to execute the Winlogbeat workers.
func (eb *Winlogbeat) Run(b *beat.Beat) error {
if err := eb.setup(b); err != nil {
return err
}
if b.Config.Output.Name() == "elasticsearch" {
callback := func(esClient *eslegclient.Connection) error {
_, err := module.UploadPipelines(b.Info, esClient, eb.config.OverwritePipelines)
return err
}
_, err := elasticsearch.RegisterConnectCallback(callback)
if err != nil {
return err
}
} else {
eb.log.Warn(pipelinesWarning)
}
acker := newEventACKer(eb.checkpoint)
persistedState := eb.checkpoint.States()
// Initialize metrics.
initMetrics("total")
if b.API != nil {
err := inputmon.AttachHandler(b.API.Router(), nil)
if err != nil {
return fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err)
}
}
if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON(nil)
if err != nil {
logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
})
}
var wg sync.WaitGroup
for _, log := range eb.eventLogs {
state := persistedState[log.source.Name()]
// Start a goroutine for each event log.
wg.Add(1)
go eb.processEventLog(&wg, log, state, acker)
}
wg.Wait()
defer eb.checkpoint.Shutdown()
if eb.config.ShutdownTimeout > 0 {
eb.log.Infof("Shutdown will wait max %v for the remaining %v events to publish.",
eb.config.ShutdownTimeout, acker.Active())
ctx, cancel := context.WithTimeout(context.Background(), eb.config.ShutdownTimeout)
defer cancel()
acker.Wait(ctx)
}
return nil
}
// Stop is used to tell the winlogbeat that it should cease executing.
func (eb *Winlogbeat) Stop() {
eb.log.Info("Stopping Winlogbeat")
if eb.done != nil {
close(eb.done)
}
}
func (eb *Winlogbeat) processEventLog(
wg *sync.WaitGroup,
logger *eventLogger,
state checkpoint.EventLogState,
acker *eventACKer,
) {
defer wg.Done()
logger.run(eb.done, eb.pipeline, state, acker)
}