metricbeat/module/docker/event/event.go (95 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //go:build linux || darwin || windows package event import ( "context" "fmt" "time" "github.com/docker/docker/api/types/events" "github.com/docker/docker/client" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/docker" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) // init registers the MetricSet with the central registry as soon as the program // starts. The New function will be called later to instantiate an instance of // the MetricSet for each host defined in the module's configuration. After the // MetricSet has been created then Fetch will begin to be called periodically. func init() { mb.Registry.MustAddMetricSet("docker", "event", New, mb.WithHostParser(docker.HostParser), mb.DefaultMetricSet(), ) } // MetricSet holds any configuration or state information. It must implement // the mb.MetricSet interface. And this is best achieved by embedding // mb.BaseMetricSet because it implements all of the required mb.MetricSet // interface methods except for Fetch. type MetricSet struct { mb.BaseMetricSet dockerClient *client.Client dedot bool logger *logp.Logger } // New creates a new instance of the MetricSet. New is responsible for unpacking // any MetricSet specific configuration options if there are any. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { config := docker.DefaultConfig() if err := base.Module().UnpackConfig(&config); err != nil { return nil, err } client, err := docker.NewDockerClient(base.HostData().URI, config) if err != nil { return nil, err } return &MetricSet{ BaseMetricSet: base, dockerClient: client, dedot: config.DeDot, logger: base.Logger().Named("docker"), }, nil } // Run listens for docker events and reports them func (m *MetricSet) Run(ctx context.Context, reporter mb.ReporterV2) { options := events.ListOptions{ Since: fmt.Sprintf("%d", time.Now().Unix()), } defer m.dockerClient.Close() for { events, errors := m.dockerClient.Events(ctx, options) WATCH: for { select { case event := <-events: m.logger.Debug("Got a new docker event: %v", event) m.reportEvent(reporter, event) case err := <-errors: // An error can be received on context cancellation, don't reconnect // if context is done. select { case <-ctx.Done(): m.logger.Debug("docker", "Event watcher stopped") return default: } // Restart watch call m.logger.Errorf("Error watching for docker events: %v", err) time.Sleep(1 * time.Second) break WATCH case <-ctx.Done(): m.logger.Debug("docker", "Event watcher stopped") return } } } } func (m *MetricSet) reportEvent(reporter mb.ReporterV2, event events.Message) { time := time.Unix(event.Time, 0) attributes := make(map[string]string, len(event.Actor.Attributes)) for k, v := range event.Actor.Attributes { if m.dedot { k = common.DeDot(k) } attributes[k] = v } reporter.Event(mb.Event{ Timestamp: time, MetricSetFields: mapstr.M{ "id": event.ID, "type": event.Type, "action": event.Action, "status": event.Status, "from": event.From, "actor": mapstr.M{ "id": event.Actor.ID, "attributes": attributes, }, }, }) }