libbeat/template/processor.go (444 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package template import ( "errors" "fmt" "reflect" "strings" "github.com/elastic/beats/v7/libbeat/mapping" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/version" ) // DefaultField controls the default value for the default_field flag. const DefaultField = false var ( minVersionFieldMeta = version.MustNew("7.6.0") minVersionHistogram = version.MustNew("7.6.0") minVersionWildcard = version.MustNew("7.9.0") minVersionExplicitDynamicTemplate = version.MustNew("7.13.0") minVersionMatchOnlyText = version.MustNew("7.14.0") ) // Processor struct to process fields to template type Processor struct { EsVersion version.V Migration bool ElasticLicensed bool // dynamicTemplatesMap records which dynamic templates have been added, to prevent duplicates. dynamicTemplatesMap map[dynamicTemplateKey]mapstr.M // dynamicTemplates records the dynamic templates in the order they were added. dynamicTemplates []mapstr.M } var ( defaultScalingFactor = 1000 defaultIgnoreAbove = 1024 ) const scalingFactorKey = "scalingFactor" type fieldState struct { DefaultField bool Path string } // Process recursively processes the given fields and writes the template in the given output func (p *Processor) Process(fields mapping.Fields, state *fieldState, output, analyzers mapstr.M) error { if state == nil { // Set the defaults. state = &fieldState{DefaultField: DefaultField} } for _, field := range fields { if field.Name == "" { continue } field.Path = state.Path if field.DefaultField == nil { field.DefaultField = &state.DefaultField } var ( indexMapping mapstr.M analyzer, searchAnalyzer mapping.Analyzer ) switch field.Type { case "ip": indexMapping = p.ip(&field) case "scaled_float": indexMapping = p.scaledFloat(&field) case "half_float": indexMapping = p.halfFloat(&field) case "integer": indexMapping = p.integer(&field) case "text": indexMapping, analyzer, searchAnalyzer = p.text(&field, analyzers) case "match_only_text": noMatchOnlyText := p.EsVersion.LessThan(minVersionMatchOnlyText) if !p.ElasticLicensed || noMatchOnlyText { indexMapping, analyzer, searchAnalyzer = p.text(&field, analyzers) } else { indexMapping, analyzer, searchAnalyzer = p.matchOnlyText(&field, analyzers) } case "wildcard": noWildcards := p.EsVersion.LessThan(minVersionWildcard) if !p.ElasticLicensed || noWildcards { indexMapping = p.keyword(&field, analyzers) } else { indexMapping = p.wildcard(&field, analyzers) } case "", "keyword": indexMapping = p.keyword(&field, analyzers) case "object": indexMapping = p.object(&field) case "array": indexMapping = p.array(&field) case "alias": indexMapping = p.alias(&field) case "histogram": indexMapping = p.histogram(&field) case "nested": mapping, err := p.nested(&field, output, analyzers) if err != nil { return err } indexMapping = mapping case "group": mapping, err := p.group(&field, output, analyzers) if err != nil { return err } indexMapping = mapping default: indexMapping = p.other(&field) } if *field.DefaultField { switch field.Type { case "", "keyword", "text", "match_only_text", "wildcard": addToDefaultFields(&field) } } if len(indexMapping) > 0 { if field.DynamicTemplate { // Explicit dynamic templates were introduced in // Elasticsearch 7.13, ignore if unsupported if !p.EsVersion.LessThan(minVersionExplicitDynamicTemplate) { p.addDynamicTemplate(field.Name, "", "", indexMapping) } } else { output.Put(mapping.GenerateKey(field.Name), indexMapping) } } for _, a := range []mapping.Analyzer{ analyzer, searchAnalyzer, } { if a.Definition != nil { prev, err := analyzers.Put(a.Name, a.Definition) if err != nil { // Should never happen. return err } if prev != nil { if !reflect.DeepEqual(prev, a.Definition) { return fmt.Errorf("inconsistent definitions for analyzers with the name %q", a.Name) } } } } } return nil } func addToDefaultFields(f *mapping.Field) { fullName := f.Name if f.Path != "" { fullName = f.Path + "." + f.Name } if f.Index == nil || (f.Index != nil && *f.Index) { defaultFields = append(defaultFields, fullName) } } func (p *Processor) other(f *mapping.Field) mapstr.M { property := p.getDefaultProperties(f) if f.Type != "" { property["type"] = f.Type } return property } func (p *Processor) integer(f *mapping.Field) mapstr.M { property := p.getDefaultProperties(f) property["type"] = "long" return property } func (p *Processor) scaledFloat(f *mapping.Field, params ...mapstr.M) mapstr.M { property := p.getDefaultProperties(f) property["type"] = "scaled_float" // Set scaling factor scalingFactor := defaultScalingFactor if f.ScalingFactor != 0 && len(f.ObjectTypeParams) == 0 { scalingFactor = f.ScalingFactor } if len(params) > 0 { if s, ok := params[0][scalingFactorKey].(int); ok && s != 0 { scalingFactor = s } } property["scaling_factor"] = scalingFactor return property } func (p *Processor) nested(f *mapping.Field, output, analyzers mapstr.M) (mapstr.M, error) { mapping, err := p.group(f, output, analyzers) if err != nil { return nil, err } mapping["type"] = "nested" return mapping, nil } func (p *Processor) group(f *mapping.Field, output, analyzers mapstr.M) (mapstr.M, error) { indexMapping := mapstr.M{} if f.Dynamic.Value != nil { indexMapping["dynamic"] = f.Dynamic.Value } // Combine properties with previous field definitions (if any) properties := mapstr.M{} key := mapping.GenerateKey(f.Name) + ".properties" currentProperties, err := output.GetValue(key) if err == nil { var ok bool properties, ok = currentProperties.(mapstr.M) if !ok { // This should never happen return nil, errors.New(key + " is expected to be a MapStr") } } groupState := &fieldState{Path: f.Name, DefaultField: *f.DefaultField} if f.Path != "" { groupState.Path = f.Path + "." + f.Name } if err := p.Process(f.Fields, groupState, properties, analyzers); err != nil { return nil, err } if len(properties) != 0 { indexMapping["properties"] = properties } return indexMapping, nil } func (p *Processor) halfFloat(f *mapping.Field) mapstr.M { property := p.getDefaultProperties(f) property["type"] = "half_float" return property } func (p *Processor) ip(f *mapping.Field) mapstr.M { property := p.getDefaultProperties(f) property["type"] = "ip" return property } func stateFromField(f *mapping.Field) *fieldState { if f == nil { return nil } st := &fieldState{ DefaultField: DefaultField, Path: f.Name, } if f.DefaultField != nil { st.DefaultField = *f.DefaultField } if f.Path != "" { st.Path = f.Path + "." + f.Name } return st } func (p *Processor) keyword(f *mapping.Field, analyzers mapstr.M) mapstr.M { property := p.getDefaultProperties(f) property["type"] = "keyword" switch f.IgnoreAbove { case 0: // Use libbeat default property["ignore_above"] = defaultIgnoreAbove case -1: // Use ES default default: // Use user value property["ignore_above"] = f.IgnoreAbove } if len(f.MultiFields) > 0 { fields := mapstr.M{} p.Process(f.MultiFields, stateFromField(f), fields, analyzers) property["fields"] = fields } return property } func (p *Processor) wildcard(f *mapping.Field, analyzers mapstr.M) mapstr.M { property := p.getDefaultProperties(f) property["type"] = "wildcard" /* For wildcard fields, unlike keywords, don't force a default ignore_above limit. The default in ES will be used unless an explicit limit is set. This is to take advantage of wildcard type benefits when indexing large strings. */ if f.IgnoreAbove > 0 { property["ignore_above"] = f.IgnoreAbove } if len(f.MultiFields) > 0 { fields := mapstr.M{} p.Process(f.MultiFields, stateFromField(f), fields, analyzers) property["fields"] = fields } return property } func (p *Processor) text(f *mapping.Field, analyzers mapstr.M) (properties mapstr.M, analyzer, searchAnalyzer mapping.Analyzer) { properties = p.getDefaultProperties(f) properties["type"] = "text" if !f.Norms { properties["norms"] = false } if f.Analyzer.Name != "" { properties["analyzer"] = f.Analyzer.Name analyzer = f.Analyzer } if f.SearchAnalyzer.Name != "" { properties["search_analyzer"] = f.SearchAnalyzer.Name searchAnalyzer = f.SearchAnalyzer } if len(f.MultiFields) > 0 { fields := mapstr.M{} p.Process(f.MultiFields, stateFromField(f), fields, analyzers) properties["fields"] = fields } return properties, analyzer, searchAnalyzer } func (p *Processor) matchOnlyText(f *mapping.Field, analyzers mapstr.M) (properties mapstr.M, analyzer, searchAnalyzer mapping.Analyzer) { properties = p.getDefaultProperties(f) properties["type"] = "match_only_text" if f.Analyzer.Name != "" { properties["analyzer"] = f.Analyzer analyzer = f.Analyzer } if f.SearchAnalyzer.Name != "" { properties["search_analyzer"] = f.SearchAnalyzer searchAnalyzer = f.SearchAnalyzer } if len(f.MultiFields) > 0 { fields := mapstr.M{} p.Process(f.MultiFields, nil, fields, analyzers) properties["fields"] = fields } return properties, analyzer, searchAnalyzer } func (p *Processor) array(f *mapping.Field) mapstr.M { properties := p.getDefaultProperties(f) if f.ObjectType != "" { properties["type"] = f.ObjectType } return properties } func (p *Processor) alias(f *mapping.Field) mapstr.M { // In case migration is disabled and it's a migration alias, field is not created if !p.Migration && f.MigrationAlias { return nil } properties := p.getDefaultProperties(f) properties["type"] = "alias" properties["path"] = f.AliasPath return properties } func (p *Processor) histogram(f *mapping.Field) mapstr.M { // Histograms were introduced in Elasticsearch 7.6, ignore if unsupported if p.EsVersion.LessThan(minVersionHistogram) { return nil } properties := p.getDefaultProperties(f) properties["type"] = "histogram" return properties } func (p *Processor) object(f *mapping.Field) mapstr.M { matchType := func(onlyType string, mt string) string { if mt != "" { return mt } return onlyType } var otParams []mapping.ObjectTypeCfg if len(f.ObjectTypeParams) != 0 { otParams = f.ObjectTypeParams } else { otParams = []mapping.ObjectTypeCfg{{ ObjectType: f.ObjectType, ObjectTypeMappingType: f.ObjectTypeMappingType, ScalingFactor: f.ScalingFactor, }} } for _, otp := range otParams { dynProperties := p.getDefaultProperties(f) var matchingType string switch otp.ObjectType { case "scaled_float": dynProperties = p.scaledFloat(f, mapstr.M{scalingFactorKey: otp.ScalingFactor}) matchingType = matchType("*", otp.ObjectTypeMappingType) case "text": dynProperties["type"] = "text" matchingType = matchType("string", otp.ObjectTypeMappingType) case "keyword": dynProperties["type"] = otp.ObjectType matchingType = matchType("string", otp.ObjectTypeMappingType) case "byte", "double", "float", "long", "short", "boolean": dynProperties["type"] = otp.ObjectType matchingType = matchType(otp.ObjectType, otp.ObjectTypeMappingType) case "histogram": dynProperties["type"] = otp.ObjectType matchingType = matchType("*", otp.ObjectTypeMappingType) default: continue } path := f.Path if len(path) > 0 { path += "." } path += f.Name pathMatch := path // ensure the `path_match` string ends with a `*` if !strings.ContainsRune(path, '*') { pathMatch += ".*" } // When multiple object type parameters are detected for a field, // add a unique part to the name of the dynamic template. // Duplicated dynamic template names can lead to errors when template // inheritance is applied, and will not be supported in future versions if len(otParams) > 1 { path = fmt.Sprintf("%s_%s", path, matchingType) } p.addDynamicTemplate(path, pathMatch, matchingType, dynProperties) } properties := p.getDefaultProperties(f) properties["type"] = "object" if f.Enabled != nil { properties["enabled"] = *f.Enabled } if f.Dynamic.Value != nil { properties["dynamic"] = f.Dynamic.Value } return properties } type dynamicTemplateKey struct { name string pathMatch string matchType string } func (p *Processor) addDynamicTemplate(name, pathMatch, matchType string, properties mapstr.M) { key := dynamicTemplateKey{ name: name, pathMatch: pathMatch, matchType: matchType, } if p.dynamicTemplatesMap == nil { p.dynamicTemplatesMap = make(map[dynamicTemplateKey]mapstr.M) } else { if _, ok := p.dynamicTemplatesMap[key]; ok { // Dynamic template already added. return } } dynamicTemplateProperties := mapstr.M{ "mapping": properties, } if matchType != "" { dynamicTemplateProperties["match_mapping_type"] = matchType } if pathMatch != "" { dynamicTemplateProperties["path_match"] = pathMatch } dynamicTemplate := mapstr.M{ name: dynamicTemplateProperties, } p.dynamicTemplatesMap[key] = dynamicTemplate p.dynamicTemplates = append(p.dynamicTemplates, dynamicTemplate) } func (p *Processor) getDefaultProperties(f *mapping.Field) mapstr.M { // Currently no defaults exist properties := mapstr.M{} if f.Index != nil { properties["index"] = *f.Index } if f.DocValues != nil { properties["doc_values"] = *f.DocValues } if f.CopyTo != "" { properties["copy_to"] = f.CopyTo } if !p.EsVersion.LessThan(minVersionFieldMeta) { if f.MetricType != "" || f.Unit != "" { meta := mapstr.M{} if f.MetricType != "" { meta["metric_type"] = f.MetricType } if f.Unit != "" { meta["unit"] = f.Unit } properties["meta"] = meta } } return properties }