packetbeat/beater/packetbeat.go (177 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package beater import ( "context" "flag" "fmt" "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/service" "github.com/elastic/beats/v7/packetbeat/config" "github.com/elastic/beats/v7/packetbeat/module" "github.com/elastic/beats/v7/packetbeat/protos" // Add packetbeat default processors _ "github.com/elastic/beats/v7/packetbeat/processor/add_kubernetes_metadata" ) // this is mainly a limitation to ensure that we never deadlock // after exiting the main select loop in centrally managed packetbeat // in order to ensure we don't block on a channel write we make sure // that the errors channel propagated back from the sniffers has a buffer // that's equal to the number of sniffers that we can run, that way, if // exiting and we throw a whole bunch of errors for some reason, each // sniffer can write out the error even though the main loop has already // exited with the result of the first error var maxSniffers = 100 type flags struct { file *string loop *int oneAtAtime *bool topSpeed *bool dumpfile *string } var cmdLineArgs = flags{ file: flag.String("I", "", "Read packet data from specified file"), loop: flag.Int("l", 1, "Loop file. 0 - loop forever"), oneAtAtime: flag.Bool("O", false, "Read packets one at a time (press Enter)"), topSpeed: flag.Bool("t", false, "Read packets as fast as possible, without sleeping"), dumpfile: flag.String("dump", "", "Write all captured packets to libpcap files with this prefix - a timestamp and pcap extension are added"), } func initialConfig() config.Config { c := config.Config{ Interfaces: []config.InterfaceConfig{{ File: *cmdLineArgs.file, Loop: *cmdLineArgs.loop, TopSpeed: *cmdLineArgs.topSpeed, OneAtATime: *cmdLineArgs.oneAtAtime, Dumpfile: *cmdLineArgs.dumpfile, }}, } c.Interface = &c.Interfaces[0] return c } // Beater object. Contains all objects needed to run the beat type packetbeat struct { config *conf.C factory *processorFactory overwritePipelines bool done chan struct{} stopOnce sync.Once } // New returns a new Packetbeat beat.Beater. func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { configurator := config.NewAgentConfig if !b.Manager.Enabled() { configurator = initialConfig().FromStatic } factory := newProcessorFactory(b.Info.Name, make(chan error, maxSniffers), b, configurator) if err := factory.CheckConfig(rawConfig); err != nil { return nil, err } var overwritePipelines bool if !b.Manager.Enabled() { // Pipeline overwrite is only enabled on standalone packetbeat // since pipelines are managed by fleet otherwise. config, err := configurator(rawConfig) if err != nil { return nil, err } overwritePipelines = config.OverwritePipelines b.OverwritePipelinesCallback = func(esConfig *conf.C) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Packetbeat") if err != nil { return err } _, err = module.UploadPipelines(b.Info, esClient, overwritePipelines) return err } } return &packetbeat{ config: rawConfig, factory: factory, overwritePipelines: overwritePipelines, done: make(chan struct{}), }, nil } // Run starts the packetbeat network capture, decoding and event publication, sending // events to b.Publisher. If b is managed, packetbeat is registered with the // reload.Registry and handled by fleet. Otherwise it is run until cancelled or a // fatal error. func (pb *packetbeat) Run(b *beat.Beat) error { defer func() { if service.ProfileEnabled() { logp.Debug("main", "Waiting for streams and transactions to expire...") time.Sleep(time.Duration(float64(protos.DefaultTransactionExpiration) * 1.2)) logp.Debug("main", "Streams and transactions should all be expired now.") } }() if b.API != nil { err := inputmon.AttachHandler(b.API.Router(), nil) if err != nil { return fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err) } } if b.Manager != nil { b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.", "input_metrics.json", "application/json", func() []byte { data, err := inputmon.MetricSnapshotJSON(nil) if err != nil { logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err) return []byte(err.Error()) } return data }) } if !b.Manager.Enabled() { if b.Config.Output.Name() == "elasticsearch" { _, err := elasticsearch.RegisterConnectCallback(func(esClient *eslegclient.Connection) error { _, err := module.UploadPipelines(b.Info, esClient, pb.overwritePipelines) return err }) if err != nil { return err } } else { logp.L().Warn(pipelinesWarning) } return pb.runStatic(b, pb.factory) } return pb.runManaged(b, pb.factory) } const pipelinesWarning = "Packetbeat is unable to load the ingest pipelines for the configured" + " modules because the Elasticsearch output is not configured/enabled. If you have" + " already loaded the ingest pipelines or are using Logstash pipelines, you" + " can ignore this warning." // runStatic constructs a packetbeat runner and starts it, returning on cancellation // or the first fatal error. func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { runner, err := factory.Create(b.Publisher, pb.config) if err != nil { return err } runner.Start() defer runner.Stop() logp.Debug("main", "Waiting for the runner to finish") select { case <-pb.done: case err := <-factory.err: pb.stopOnce.Do(func() { close(pb.done) }) return err } return nil } // runManaged registers a packetbeat runner with the reload.Registry and starts // the runner by starting the beat's manager. It returns on the first fatal error. func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { runner := newReloader(management.DebugK, factory, b.Publisher, b.Info.Logger) b.Registry.MustRegisterInput(runner) logp.Debug("main", "Waiting for the runner to finish") // Start the manager after all the hooks are registered and terminates when // the function return. if err := b.Manager.Start(); err != nil { return err } defer func() { runner.Stop() b.Manager.Stop() }() for { select { case <-pb.done: return nil case err := <-factory.err: // when we're managed we don't want // to stop if the sniffer(s) exited without an error // this would happen during a configuration reload if err != nil { pb.stopOnce.Do(func() { close(pb.done) }) return err } } } } // Called by the Beat stop function func (pb *packetbeat) Stop() { logp.Info("Packetbeat send stop signal") pb.stopOnce.Do(func() { close(pb.done) }) }