filebeat/autodiscover/builder/hints/logs.go (238 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package hints import ( "fmt" "regexp" "github.com/elastic/go-ucfg" "github.com/elastic/elastic-agent-autodiscover/bus" "github.com/elastic/elastic-agent-autodiscover/utils" "github.com/elastic/beats/v7/filebeat/fileset" "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/libbeat/autodiscover" "github.com/elastic/beats/v7/libbeat/autodiscover/template" "github.com/elastic/beats/v7/libbeat/beat" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) const ( multiline = "multiline" includeLines = "include_lines" excludeLines = "exclude_lines" processors = "processors" json = "json" pipeline = "pipeline" ndjson = "ndjson" parsers = "parsers" ) // validModuleNames to sanitize user input var validModuleNames = regexp.MustCompile(`[^a-zA-Z0-9\\_\\-]+`) type logHints struct { config *config registry *fileset.ModuleRegistry log *logp.Logger } // InitializeModule initializes this module. func InitializeModule() { err := autodiscover.Registry.AddBuilder("hints", NewLogHints) if err != nil { logp.Error(fmt.Errorf("could not add `hints` builder")) } } // NewLogHints builds a log hints builder func NewLogHints(cfg *conf.C, logger *logp.Logger) (autodiscover.Builder, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("unable to unpack hints config due to error: %w", err) } moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{Logger: logger}, false, fileset.FilesetOverrides{}) if err != nil { return nil, err } return &logHints{&config, moduleRegistry, logger.Named("hints.builder")}, nil } // Create config based on input hints in the bus event func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf.C { var hints mapstr.M if hintsIfc, found := event["hints"]; found { hints, _ = hintsIfc.(mapstr.M) } // Hint must be explicitly enabled when default_config sets enabled=false. if !l.config.DefaultConfig.Enabled() && !utils.IsEnabled(hints, l.config.Key) || utils.IsDisabled(hints, l.config.Key) { l.log.Debugw("Hints config is not enabled.", "autodiscover.event", event) return nil } if inputConfig := l.getInputsConfigs(hints); inputConfig != nil { var configs []*conf.C for _, cfg := range inputConfig { if config, err := conf.NewConfigFrom(cfg); err == nil { configs = append(configs, config) } else { l.log.Warnw("Failed to create config from input.", "error", err) } } l.log.Debugf("Generated %d input configs from hint.", len(configs)) // Apply information in event to the template to generate the final config return template.ApplyConfigTemplate(event, configs) } var configs []*conf.C //nolint:prealloc //breaks tests inputs := l.getInputs(hints) for _, h := range inputs { // Clone original config, enable it if disabled config, _ := conf.NewConfigFrom(l.config.DefaultConfig) _, err := config.Remove("enabled", -1) if err != nil { continue } inputType, _ := config.String("type", -1) tempCfg := mapstr.M{} if mline := l.getMultiline(h); len(mline) != 0 { if inputType == harvester.FilestreamType { // multiline options should be under multiline parser in filestream input parsersTempCfg := []mapstr.M{} mlineTempCfg := mapstr.M{} shouldPut(mlineTempCfg, multiline, mline, l.log) parsersTempCfg = append(parsersTempCfg, mlineTempCfg) shouldPut(tempCfg, parsers, parsersTempCfg, l.log) } else { shouldPut(tempCfg, multiline, mline, l.log) } } if ilines := l.getIncludeLines(h); len(ilines) != 0 { shouldPut(tempCfg, includeLines, ilines, l.log) } if elines := l.getExcludeLines(h); len(elines) != 0 { shouldPut(tempCfg, excludeLines, elines, l.log) } if procs := l.getProcessors(h); len(procs) != 0 { shouldPut(tempCfg, processors, procs, l.log) } if pip := l.getPipeline(h); len(pip) != 0 { shouldPut(tempCfg, pipeline, pip, l.log) } if jsonOpts := l.getJSONOptions(h); len(jsonOpts) != 0 { if inputType == harvester.FilestreamType { // json options should be under ndjson parser in filestream input parsersTempCfg := []mapstr.M{} ndjsonTempCfg := mapstr.M{} shouldPut(ndjsonTempCfg, ndjson, jsonOpts, l.log) parsersTempCfg = append(parsersTempCfg, ndjsonTempCfg) shouldPut(tempCfg, parsers, parsersTempCfg, l.log) } else { shouldPut(tempCfg, json, jsonOpts, l.log) } } // Merge config template with the configs from the annotations // AppendValues option is used to append arrays from annotations to existing arrays while merging if err := config.MergeWithOpts(tempCfg, ucfg.AppendValues); err != nil { l.log.Debugf("hints.builder", "config merge failed with error: %v", err) continue } module := l.getModule(hints) if module != "" { moduleConf := map[string]interface{}{ "module": module, } filesets := l.getFilesets(hints, module) for fileset, cfg := range filesets { filesetConf, _ := conf.NewConfigFrom(config) switch inputType { case harvester.ContainerType: _ = filesetConf.SetString("stream", -1, cfg.Stream) case harvester.FilestreamType: filestreamContainerParser := map[string]interface{}{ "container": map[string]interface{}{ "stream": cfg.Stream, "format": "auto", }, } parserCfg, _ := conf.NewConfigFrom(filestreamContainerParser) _ = filesetConf.SetChild("parsers", 0, parserCfg) default: _ = filesetConf.SetString("containers.stream", -1, cfg.Stream) } moduleConf[fileset+".enabled"] = cfg.Enabled moduleConf[fileset+".input"] = filesetConf l.log.Debugf("hints.builder", "generated config %+v", moduleConf) } config, _ = conf.NewConfigFrom(moduleConf) } l.log.Debugf("hints.builder", "generated config %+v of logHints %+v", config, l) configs = append(configs, config) } // Apply information in event to the template to generate the final config return template.ApplyConfigTemplate(event, configs) } func (l *logHints) getMultiline(hints mapstr.M) mapstr.M { return utils.GetHintMapStr(hints, l.config.Key, multiline) } func (l *logHints) getIncludeLines(hints mapstr.M) []string { return utils.GetHintAsList(hints, l.config.Key, includeLines) } func (l *logHints) getExcludeLines(hints mapstr.M) []string { return utils.GetHintAsList(hints, l.config.Key, excludeLines) } func (l *logHints) getModule(hints mapstr.M) string { module := utils.GetHintString(hints, l.config.Key, "module") // for security, strip module name return validModuleNames.ReplaceAllString(module, "") } func (l *logHints) getInputsConfigs(hints mapstr.M) []mapstr.M { return utils.GetHintAsConfigs(hints, l.config.Key) } func (l *logHints) getProcessors(hints mapstr.M) []mapstr.M { return utils.GetProcessors(hints, l.config.Key) } func (l *logHints) getPipeline(hints mapstr.M) string { return utils.GetHintString(hints, l.config.Key, "pipeline") } func (l *logHints) getJSONOptions(hints mapstr.M) mapstr.M { return utils.GetHintMapStr(hints, l.config.Key, json) } type filesetConfig struct { Enabled bool Stream string } // Return a map containing filesets -> enabled & stream (stdout, stderr, all) func (l *logHints) getFilesets(hints mapstr.M, module string) map[string]*filesetConfig { var configured bool filesets := make(map[string]*filesetConfig) moduleFilesets, err := l.registry.ModuleAvailableFilesets(module) if err != nil { l.log.Errorf("Error retrieving module filesets: %+v", err) return nil } for _, fileset := range moduleFilesets { filesets[fileset] = &filesetConfig{Enabled: false, Stream: "all"} } // If a single fileset is given, pass all streams to it fileset := utils.GetHintString(hints, l.config.Key, "fileset") if fileset != "" { if conf, ok := filesets[fileset]; ok { conf.Enabled = true configured = true } } // If fileset is defined per stream, return all of them for _, stream := range []string{"all", "stdout", "stderr"} { fileset := utils.GetHintString(hints, l.config.Key, "fileset."+stream) if fileset != "" { if conf, ok := filesets[fileset]; ok { conf.Enabled = true conf.Stream = stream configured = true } } } // No fileset defined, return defaults for the module, all streams to all filesets if !configured { for _, conf := range filesets { conf.Enabled = true } } return filesets } func (l *logHints) getInputs(hints mapstr.M) []mapstr.M { modules := utils.GetHintsAsList(hints, l.config.Key) var output []mapstr.M //nolint:prealloc //breaks tests for _, mod := range modules { output = append(output, mapstr.M{ l.config.Key: mod, }) } // Generate this so that no hints with completely valid templates work if len(output) == 0 { output = append(output, mapstr.M{ l.config.Key: mapstr.M{}, }) } return output } func shouldPut(event mapstr.M, field string, value interface{}, logger *logp.Logger) { _, err := event.Put(field, value) if err != nil { logger.Debugf("Failed to put field '%s' with value '%s': %s", field, value, err) } }