libbeat/processors/add_docker_metadata/add_docker_metadata.go (244 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //go:build linux || darwin || windows package add_docker_metadata import ( "errors" "fmt" "io/fs" "os" "regexp" "strings" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/elastic-agent-autodiscover/docker" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/safemapstr" "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" ) const ( processorName = "add_docker_metadata" dockerContainerIDKey = "container.id" cgroupCacheExpiration = 5 * time.Minute ) // initCgroupPaths initializes a new cgroup reader. This enables // unit testing by allowing us to stub the OS interface. var initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) { return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups) } func init() { processors.RegisterPlugin(processorName, New) } type addDockerMetadata struct { log *logp.Logger watcher docker.Watcher fields []string sourceProcessor beat.Processor pidFields []string // Field names that contain PIDs. cgroups *common.Cache // Cache of PID (int) to container ids (string). dedot bool // If set to true, replace dots in labels with `_`. dockerAvailable bool // If Docker exists in env, then it is set to true cgreader processors.CGReader } const selector = "add_docker_metadata" // New constructs a new add_docker_metadata processor. func New(cfg *conf.C) (beat.Processor, error) { return buildDockerMetadataProcessor(logp.NewLogger(selector), cfg, docker.NewWatcher) } func buildDockerMetadataProcessor(log *logp.Logger, cfg *conf.C, watcherConstructor docker.WatcherConstructor) (beat.Processor, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) } var dockerAvailable bool watcher, err := watcherConstructor(log, config.Host, config.TLS, config.MatchShortID) if err != nil { dockerAvailable = false log.Debugf("%v: docker environment not detected: %+v", processorName, err) } else { dockerAvailable = true log.Debugf("%v: docker environment detected", processorName) if err = watcher.Start(); err != nil { return nil, fmt.Errorf("failed to start watcher: %w", err) } } // Use extract_field processor to get container ID from source file path. var sourceProcessor beat.Processor if config.MatchSource { var procConf, _ = conf.NewConfigFrom(map[string]interface{}{ "field": "log.file.path", "separator": string(os.PathSeparator), "index": config.SourceIndex, "target": dockerContainerIDKey, }) sourceProcessor, err = actions.NewExtractField(procConf) if err != nil { return nil, err } } reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostFS), false) if errors.Is(err, cgroup.ErrCgroupsMissing) { reader = &processors.NilCGReader{} } else if err != nil { return nil, fmt.Errorf("error creating cgroup reader: %w", err) } return &addDockerMetadata{ log: log, watcher: watcher, fields: config.Fields, sourceProcessor: sourceProcessor, pidFields: config.MatchPIDs, dedot: config.DeDot, dockerAvailable: dockerAvailable, cgreader: reader, }, nil } func lazyCgroupCacheInit(d *addDockerMetadata) { if d.cgroups == nil { d.log.Debug("Initializing cgroup cache") evictionListener := func(k common.Key, v common.Value) { d.log.Debugf("Evicted cached cgroups for PID=%v", k) } d.cgroups = common.NewCacheWithRemovalListener(cgroupCacheExpiration, 100, evictionListener) d.cgroups.StartJanitor(5 * time.Second) } } func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) { if !d.dockerAvailable { return event, nil } var cid string var err error // Extract CID from the filepath contained in the "log.file.path" field. if d.sourceProcessor != nil { lfp, _ := event.Fields.GetValue("log.file.path") if lfp != nil { event, err = d.sourceProcessor.Run(event) if err != nil { d.log.Debugf("Error while extracting container ID from source path: %v", err) return event, nil } if v, err := event.GetValue(dockerContainerIDKey); err == nil { cid, _ = v.(string) } } } // Lookup CID using process cgroup membership data. if cid == "" && len(d.pidFields) > 0 { id, err := d.lookupContainerIDByPID(event) if err != nil { return nil, fmt.Errorf("error reading container ID: %w", err) } if id != "" { cid = id _, _ = event.PutValue(dockerContainerIDKey, cid) } } // Lookup CID from the user defined field names. if cid == "" && len(d.fields) > 0 { for _, field := range d.fields { value, err := event.GetValue(field) if err != nil { continue } if strValue, ok := value.(string); ok { cid = strValue break } } } if cid == "" { return event, nil } container := d.watcher.Container(cid) if container != nil { meta := mapstr.M{} if len(container.Labels) > 0 { labels := mapstr.M{} for k, v := range container.Labels { if d.dedot { label := common.DeDot(k) _, _ = labels.Put(label, v) } else { _ = safemapstr.Put(labels, k, v) } } _, _ = meta.Put("container.labels", labels) } _, _ = meta.Put("container.id", container.ID) _, _ = meta.Put("container.image.name", container.Image) _, _ = meta.Put("container.name", container.Name) event.Fields.DeepUpdate(meta.Clone()) } else { d.log.Debugf("Container not found: cid=%s", cid) } return event, nil } func (d *addDockerMetadata) Close() error { if d.cgroups != nil { d.cgroups.StopJanitor() } // Watcher can be nil if processor failed on creation if d.watcher != nil { d.watcher.Stop() } err := processors.Close(d.sourceProcessor) if err != nil { return fmt.Errorf("closing source processor of add_docker_metadata: %w", err) } return nil } func (d *addDockerMetadata) String() string { return fmt.Sprintf("%v=[match_fields=[%v] match_pids=[%v]]", processorName, strings.Join(d.fields, ", "), strings.Join(d.pidFields, ", ")) } // lookupContainerIDByPID finds the container ID based on PID fields contained // in the event. func (d *addDockerMetadata) lookupContainerIDByPID(event *beat.Event) (string, error) { pids := make([]int, 0, len(d.pidFields)) for _, field := range d.pidFields { v, err := event.GetValue(field) if err != nil { continue } pid, ok := common.TryToInt(v) if !ok { d.log.Debugf("field %v is not a PID (type=%T, value=%v)", field, v, v) continue } if d.cgroups != nil { if cid := d.cgroups.Get(pid); cid != nil { d.log.Debugf("Using cached cgroups for pid=%v", pid) return cid.(string), nil } } pids = append(pids, pid) } for _, pid := range pids { cgroups, err := d.getProcessCgroups(pid) if err != nil && errors.Is(err, fs.ErrNotExist) { continue } if err != nil { d.log.Debugf("failed to get cgroups for pid=%v: %v", pid, err) } // Initialize at time of first use. lazyCgroupCacheInit(d) cid, err := getContainerIDFromCgroups(cgroups) d.cgroups.Put(pid, cid) return cid, err } return "", nil } // getProcessCgroups returns a mapping of cgroup subsystem name to path. It // returns an error if it failed to retrieve the cgroup info. func (d *addDockerMetadata) getProcessCgroups(pid int) (cgroup.PathList, error) { cgroups, err := d.cgreader.ProcessCgroupPaths(pid) if err != nil { return cgroups, fmt.Errorf("failed to read cgroups for pid=%v: %w", pid, err) } if len(cgroups.Flatten()) == 0 { return cgroup.PathList{}, fs.ErrNotExist } return cgroups, nil } var re = regexp.MustCompile(`[\w]{64}`) // getContainerIDFromCgroups checks all of the processes' paths to see if any // of them are associated with Docker. For cgroups V1, Docker uses /docker/<CID> when // naming cgroups and we use this to determine the container ID. For V2, // it's part of a more complex string. func getContainerIDFromCgroups(cgroups cgroup.PathList) (string, error) { for _, path := range cgroups.Flatten() { rs := re.FindStringSubmatch(path.ControllerPath) if rs != nil { return rs[0], nil } } return "", nil }