libbeat/template/template.go (268 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package template
import (
"fmt"
"sync"
"time"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/version"
"github.com/elastic/go-ucfg/yaml"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/mapping"
)
var (
// Defaults used in the template
defaultDateDetection = false
defaultTotalFieldsLimit = 12500
defaultMaxDocvalueFieldsSearch = 200
defaultFields []string
)
// Template holds information for the ES template.
type Template struct {
sync.Mutex
name string
pattern string
elasticLicensed bool
beatVersion version.V
beatName string
esVersion version.V
config TemplateConfig
migration bool
priority int
isServerless bool
}
// New creates a new template instance
func New(
isServerless bool,
beatVersion string,
beatName string,
elasticLicensed bool,
esVersion version.V,
config TemplateConfig,
migration bool,
) (*Template, error) {
bV, err := version.New(beatVersion)
if err != nil {
return nil, err
}
name := config.Name
if config.JSON.Enabled {
name = config.JSON.Name
}
if name == "" {
name = fmt.Sprintf("%s-%s", beatName, bV.String())
}
pattern := config.Pattern
if pattern == "" {
pattern = name + "*"
}
event := &beat.Event{
Fields: mapstr.M{
// beat object was left in for backward compatibility reason for older configs.
"beat": mapstr.M{
"name": beatName,
"version": bV.String(),
},
"agent": mapstr.M{
"name": beatName,
"version": bV.String(),
},
// For the Beats that have an observer role
"observer": mapstr.M{
"name": beatName,
"version": bV.String(),
},
},
Timestamp: time.Now(),
}
nameFormatter, err := fmtstr.CompileEvent(name)
if err != nil {
return nil, err
}
name, err = nameFormatter.Run(event)
if err != nil {
return nil, err
}
patternFormatter, err := fmtstr.CompileEvent(pattern)
if err != nil {
return nil, err
}
pattern, err = patternFormatter.Run(event)
if err != nil {
return nil, err
}
// In case no esVersion is set, it is assumed the same as beat version
if !esVersion.IsValid() {
esVersion = *bV
}
return &Template{
pattern: pattern,
name: name,
elasticLicensed: elasticLicensed,
beatVersion: *bV,
esVersion: esVersion,
beatName: beatName,
config: config,
migration: migration,
priority: config.Priority,
isServerless: isServerless,
}, nil
}
func (t *Template) load(fields mapping.Fields) (mapstr.M, error) {
// Locking to make sure dynamicTemplates and defaultFields is not accessed in parallel
t.Lock()
defer t.Unlock()
defaultFields = nil
var err error
if len(t.config.AppendFields) > 0 {
fields, err = mapping.ConcatFields(fields, t.config.AppendFields)
if err != nil {
return nil, err
}
}
// Start processing at the root
properties := mapstr.M{}
analyzers := mapstr.M{}
processor := Processor{EsVersion: t.esVersion, ElasticLicensed: t.elasticLicensed, Migration: t.migration}
if err := processor.Process(fields, nil, properties, analyzers); err != nil {
return nil, err
}
output := t.Generate(properties, analyzers, processor.dynamicTemplates)
return output, nil
}
// LoadFile loads the the template from the given file path
func (t *Template) LoadFile(file string) (mapstr.M, error) {
fields, err := mapping.LoadFieldsYaml(file)
if err != nil {
return nil, err
}
return t.load(fields)
}
// LoadBytes loads the template from the given byte array
func (t *Template) LoadBytes(data []byte) (mapstr.M, error) {
fields, err := loadYamlByte(data)
if err != nil {
return nil, err
}
return t.load(fields)
}
// LoadMinimal loads the template only with the given configuration
func (t *Template) LoadMinimal() mapstr.M {
templ := mapstr.M{}
if t.config.Settings.Source != nil {
templ["mappings"] = buildMappings(
t.beatVersion, t.beatName,
nil, nil,
mapstr.M(t.config.Settings.Source))
}
// delete default settings not available on serverless
if _, ok := t.config.Settings.Index["number_of_shards"]; ok && t.isServerless {
delete(t.config.Settings.Index, "number_of_shards")
}
templ["settings"] = mapstr.M{
"index": t.config.Settings.Index,
}
return mapstr.M{
"template": templ,
"data_stream": struct{}{},
"priority": t.priority,
"index_patterns": []string{t.GetPattern()},
}
}
// GetName returns the name of the template
func (t *Template) GetName() string {
return t.name
}
// GetPattern returns the pattern of the template
func (t *Template) GetPattern() string {
return t.pattern
}
// Generate generates the full template
// The default values are taken from the default variable.
func (t *Template) Generate(properties, analyzers mapstr.M, dynamicTemplates []mapstr.M) mapstr.M {
tmpl := t.generateComponent(properties, analyzers, dynamicTemplates)
tmpl["data_stream"] = struct{}{}
tmpl["priority"] = t.priority
tmpl["index_patterns"] = []string{t.GetPattern()}
return tmpl
}
func (t *Template) generateComponent(properties, analyzers mapstr.M, dynamicTemplates []mapstr.M) mapstr.M {
m := mapstr.M{
"template": mapstr.M{
"mappings": buildMappings(
t.beatVersion, t.beatName,
properties,
append(dynamicTemplates, buildDynTmpl(t.esVersion)),
mapstr.M(t.config.Settings.Source)),
"settings": mapstr.M{
"index": buildIdxSettings(
t.esVersion,
t.config.Settings.Index,
t.isServerless,
),
},
},
}
if len(t.config.Settings.Lifecycle) > 0 {
m.Put("template.lifecycle", t.config.Settings.Lifecycle)
}
if len(analyzers) != 0 {
m.Put("template.settings.analysis.analyzer", analyzers)
}
return m
}
func buildMappings(
beatVersion version.V,
beatName string,
properties mapstr.M,
dynTmpls []mapstr.M,
source mapstr.M,
) mapstr.M {
mapping := mapstr.M{
"_meta": mapstr.M{
"version": beatVersion.String(),
"beat": beatName,
},
"date_detection": defaultDateDetection,
"dynamic_templates": dynTmpls,
"properties": properties,
}
if len(source) > 0 {
mapping["_source"] = source
}
return mapping
}
func buildDynTmpl(ver version.V) mapstr.M {
return mapstr.M{
"strings_as_keyword": mapstr.M{
"mapping": mapstr.M{
"ignore_above": 1024,
"type": "keyword",
},
"match_mapping_type": "string",
},
}
}
func buildIdxSettings(ver version.V, userSettings mapstr.M, isServerless bool) mapstr.M {
indexSettings := mapstr.M{
"refresh_interval": "5s",
"mapping": mapstr.M{
"total_fields": mapstr.M{
"limit": defaultTotalFieldsLimit,
},
},
}
// copy defaultFields, as defaultFields is shared global slice.
fields := make([]string, len(defaultFields))
copy(fields, defaultFields)
fields = append(fields, "fields.*")
indexSettings.Put("query.default_field", fields)
// deal with settings that aren't available on serverless
if isServerless {
logp.L().Infof("remote instance is serverless, number_of_shards and max_docvalue_fields_search will be skipped in index template")
userSettings.Delete("number_of_shards")
} else {
indexSettings.Put("max_docvalue_fields_search", defaultMaxDocvalueFieldsSearch)
}
indexSettings.DeepUpdate(userSettings)
return indexSettings
}
func loadYamlByte(data []byte) (mapping.Fields, error) {
cfg, err := yaml.NewConfig(data)
if err != nil {
return nil, err
}
var keys []mapping.Field
err = cfg.Unpack(&keys)
if err != nil {
return nil, err
}
fields := mapping.Fields{}
for _, key := range keys {
fields = append(fields, key.Fields...)
}
return fields, nil
}