libbeat/template/load.go (226 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package template
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
"github.com/elastic/elastic-agent-libs/version"
)
// Loader interface for loading templates.
type Loader interface {
Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error
}
// ESLoader implements Loader interface for loading templates to Elasticsearch.
type ESLoader struct {
client ESClient
lifecycleClient lifecycle.ClientHandler
builder *templateBuilder
log *logp.Logger
}
// ESClient is a subset of the Elasticsearch client API capable of
// loading the template.
type ESClient interface {
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
GetVersion() version.V
IsServerless() bool
}
// FileLoader implements Loader interface for loading templates to a File.
type FileLoader struct {
client FileClient
builder *templateBuilder
log *logp.Logger
}
// FileClient defines the minimal interface required for the FileLoader
type FileClient interface {
GetVersion() version.V
Write(component string, name string, body string) error
}
type StatusError struct {
status int
}
type templateBuilder struct {
log *logp.Logger
isServerless bool
}
// NewESLoader creates a new template loader for ES
func NewESLoader(client ESClient, lifecycleClient lifecycle.ClientHandler, logger *logp.Logger) (*ESLoader, error) {
if client == nil {
return nil, errors.New("can not load template without active Elasticsearch client")
}
return &ESLoader{client: client, lifecycleClient: lifecycleClient,
builder: newTemplateBuilder(client.IsServerless(), logger), log: logger.Named("template_loader")}, nil
}
// NewFileLoader creates a new template loader for the given file.
func NewFileLoader(c FileClient, isServerless bool, logger *logp.Logger) *FileLoader {
// other components of the file loader will fail if both ILM and DSL are set,
// so at this point it's fairly safe to just pass cfg.DSL.Enabled
return &FileLoader{client: c, builder: newTemplateBuilder(isServerless, logger), log: logger.Named("file_template_loader")}
}
func newTemplateBuilder(serverlessMode bool, logger *logp.Logger) *templateBuilder {
return &templateBuilder{log: logger.Named("template"), isServerless: serverlessMode}
}
// Load checks if the index mapping template should be loaded.
// In case the template is not already loaded or overwriting is enabled, the
// template is built and written to index.
func (l *ESLoader) Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error {
// build template from config
tmpl, err := l.builder.template(config, info, l.client.GetVersion(), migration)
if err != nil || tmpl == nil {
return err
}
// Check if template already exist or should be overwritten
templateName := tmpl.GetName()
if config.JSON.Enabled {
templateName = config.JSON.Name
}
exists, err := l.checkExistsTemplate(templateName)
if err != nil {
return fmt.Errorf("failure while checking if template exists: %w", err)
}
if exists && !config.Overwrite {
l.log.Infof("Template %q already exists and will not be overwritten.", templateName)
return nil
}
// loading template to ES
body, err := l.builder.buildBody(tmpl, config, fields)
if err != nil {
return err
}
if err := l.loadTemplate(templateName, body); err != nil {
return fmt.Errorf("failed to load template: %w", err)
}
l.log.Infof("Template with name %q loaded.", templateName)
// if JSON template is loaded and it is not a data stream
// we are done with loading.
if config.JSON.Enabled && !config.JSON.IsDataStream {
return nil
}
// If a data stream already exists, we do not attempt to delete or overwrite
// it because it would delete all backing indices, and the user would lose all
// their documents.
dataStreamExist, err := l.checkExistsDatastream(templateName)
if err != nil {
return fmt.Errorf("failed to check data stream: %w", err)
}
if dataStreamExist {
l.log.Infof("Data stream with name %q already exists.", templateName)
// for serverless, we can update the lifecycle policy safely
// Note that updating the lifecycle will delete older documents
// if the policy requires it; i.e, changing the data_retention from 10d to 7d
// will delete the documents older than 7 days.
if l.client.IsServerless() {
l.log.Infof("overwriting lifecycle policy")
err = l.lifecycleClient.CreatePolicyFromConfig()
if err != nil {
return fmt.Errorf("error updating lifecycle policy: %w", err)
}
}
return nil
}
if err := l.putDataStream(templateName); err != nil {
return fmt.Errorf("failed to put data stream: %w", err)
}
l.log.Infof("Data stream with name %q loaded.", templateName)
return nil
}
// loadTemplate loads a template into Elasticsearch overwriting the existing
// template if it exists. If you wish to not overwrite an existing template
// then use CheckTemplate prior to calling this method.
func (l *ESLoader) loadTemplate(templateName string, template map[string]interface{}) error {
l.log.Infof("Try loading template %s to Elasticsearch", templateName)
path := "/_index_template/" + templateName
status, body, err := l.client.Request("PUT", path, "", nil, template)
if err != nil {
return fmt.Errorf("couldn't load template: %w. Response body: %s", err, body)
}
if status > http.StatusMultipleChoices { //http status 300
return fmt.Errorf("couldn't load json. Status: %v", status)
}
return nil
}
func (l *ESLoader) checkExistsDatastream(name string) (bool, error) {
status, _, err := l.client.Request("GET", "/_data_stream/"+name, "", nil, nil)
if status == http.StatusNotFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
func (l *ESLoader) putDataStream(name string) error {
l.log.Infof("Try loading data stream %s to Elasticsearch", name)
path := "/_data_stream/" + name
_, body, err := l.client.Request("PUT", path, "", nil, nil)
if err != nil {
return fmt.Errorf("could not put data stream: %w. Response body: %s", err, body)
}
return nil
}
// existsTemplate checks if a given template already exist, using the
// `/_index_template/<name>` API.
//
// An error is returned if the loader failed to execute the request, or a
// status code indicating some problems is encountered.
func (l *ESLoader) checkExistsTemplate(name string) (bool, error) {
status, _, err := l.client.Request("HEAD", "/_index_template/"+name, "", nil, nil)
if status == http.StatusNotFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
// Load reads the template from the config, creates the template body and prints it to the configured file.
func (l *FileLoader) Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error {
//build template from config
tmpl, err := l.builder.template(config, info, l.client.GetVersion(), migration)
if err != nil || tmpl == nil {
return err
}
//create body to print
body, err := l.builder.buildBody(tmpl, config, fields)
if err != nil {
return err
}
str := fmt.Sprintf("%s\n", body.StringToPrint())
if err := l.client.Write("template", tmpl.name, str); err != nil {
return fmt.Errorf("error printing template: %w", err)
}
return nil
}
func (b *templateBuilder) template(config TemplateConfig, info beat.Info, esVersion version.V, migration bool) (*Template, error) {
if !config.Enabled {
b.log.Info("template config not enabled")
return nil, nil
}
tmpl, err := New(b.isServerless, info.Version, info.IndexPrefix, info.ElasticLicensed, esVersion, config, migration)
if err != nil {
return nil, fmt.Errorf("error creating template instance: %w", err)
}
return tmpl, nil
}
func (b *templateBuilder) buildBody(tmpl *Template, config TemplateConfig, fields []byte) (mapstr.M, error) {
if config.Overwrite {
b.log.Info("Existing template will be overwritten, as overwrite is enabled.")
}
if config.JSON.Enabled {
return b.buildBodyFromJSON(config)
}
if config.Fields != "" {
return b.buildBodyFromFile(tmpl, config)
}
if fields == nil {
b.log.Debug("Load minimal template")
return tmpl.LoadMinimal(), nil
}
return b.buildBodyFromFields(tmpl, fields)
}
func (b *templateBuilder) buildBodyFromJSON(config TemplateConfig) (mapstr.M, error) {
jsonPath := paths.Resolve(paths.Config, config.JSON.Path)
if _, err := os.Stat(jsonPath); err != nil {
return nil, fmt.Errorf("error checking json file %s for template: %w", jsonPath, err)
}
b.log.Debugf("Loading json template from file %s", jsonPath)
content, err := os.ReadFile(jsonPath)
if err != nil {
return nil, fmt.Errorf("error reading file %s for template: %w", jsonPath, err)
}
var body map[string]interface{}
err = json.Unmarshal(content, &body)
if err != nil {
return nil, fmt.Errorf("could not unmarshal json template: %w", err)
}
return body, nil
}
func (b *templateBuilder) buildBodyFromFile(tmpl *Template, config TemplateConfig) (mapstr.M, error) {
b.log.Debugf("Load fields.yml from file: %s", config.Fields)
fieldsPath := paths.Resolve(paths.Config, config.Fields)
body, err := tmpl.LoadFile(fieldsPath)
if err != nil {
return nil, fmt.Errorf("error creating template from file %s: %w", fieldsPath, err)
}
return body, nil
}
func (b *templateBuilder) buildBodyFromFields(tmpl *Template, fields []byte) (mapstr.M, error) {
b.log.Debug("Load default fields")
body, err := tmpl.LoadBytes(fields)
if err != nil {
return nil, fmt.Errorf("error creating template: %w", err)
}
return body, nil
}
func (e *StatusError) Error() string {
return fmt.Sprintf("request failed with http status code %v", e.status)
}