dev/import-beats/elasticsearch.go (144 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"regexp"
"strings"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v3"
)
type elasticsearchContent struct {
ingestPipelines []ingestPipelineContent
}
type ingestPipelineContent struct {
targetFileName string
body []byte
}
var (
reUnsupportedIfInPipeline = regexp.MustCompile("{<[ ]{0,1}if[^(>})]+>}")
reUnsupportedIngestPipelineInPipeline = regexp.MustCompile("('|\"){< (IngestPipeline).+>}('|\")")
reUnsupportedPlaceholderInPipeline = regexp.MustCompile("{<.+>}")
)
func loadElasticsearchContent(dataStreamPath string) (elasticsearchContent, error) {
var esc elasticsearchContent
dataStreamManifestPath := filepath.Join(dataStreamPath, "manifest.yml")
dataStreamManifestFile, err := ioutil.ReadFile(dataStreamManifestPath)
if os.IsNotExist(err) {
return elasticsearchContent{}, nil // no manifest.yml file found,
}
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "reading dataStream manifest file failed (path: %s)", dataStreamManifestPath)
}
var ingestPipelines []string
var dmsp dataStreamManifestSinglePipeline
err = yaml.Unmarshal(dataStreamManifestFile, &dmsp)
if err == nil {
if len(dmsp.IngestPipeline) > 0 {
ingestPipelines = append(ingestPipelines, dmsp.IngestPipeline)
}
} else {
var dmmp dataStreamManifestMultiplePipelines
err = yaml.Unmarshal(dataStreamManifestFile, &dmmp)
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "unmarshalling dataStream manifest file failed (path: %s)", dataStreamManifestPath)
}
if len(dmmp.IngestPipeline) > 0 {
ingestPipelines = append(ingestPipelines, dmmp.IngestPipeline...)
}
}
for _, ingestPipeline := range ingestPipelines {
ingestPipeline = ensurePipelineFormat(ingestPipeline)
log.Printf("\tingest-pipeline found: %s", ingestPipeline)
var targetFileName string
if len(ingestPipelines) == 1 {
targetFileName, err = buildSingleIngestPipelineTargetName(ingestPipeline)
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "can't build single ingest pipeline target name (path: %s)", ingestPipeline)
}
} else {
targetFileName, err = determineIngestPipelineTargetName(ingestPipeline)
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "can't determine ingest pipeline target name (path: %s)", ingestPipeline)
}
}
pipelinePath := filepath.Join(dataStreamPath, ingestPipeline)
body, err := ioutil.ReadFile(pipelinePath)
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "reading pipeline body failed (path: %s)", pipelinePath)
}
// Fix missing "---" at the beginning of the YAML pipeline.
if strings.HasSuffix(targetFileName, ".yml") && bytes.Index(body, []byte("---")) != 0 {
body = append([]byte("---\n"), body...)
}
ipc := ingestPipelineContent{
targetFileName: targetFileName,
body: adjustUnsupportedStructuresInPipeline(body),
}
err = validateIngestPipeline(ipc)
if err != nil {
return elasticsearchContent{},
errors.Wrapf(err, "validation of modified ingest pipeline failed (original path: %s)", pipelinePath)
}
esc.ingestPipelines = append(esc.ingestPipelines, ipc)
}
return esc, nil
}
func buildSingleIngestPipelineTargetName(path string) (string, error) {
_, ext, err := splitFilenameExt(path)
if err != nil {
return "", errors.Wrapf(err, "processing filename failed (path: %s)", path)
}
return "default." + ext, nil
}
func ensurePipelineFormat(ingestPipeline string) string {
if strings.Contains(ingestPipeline, "{{.format}}") {
ingestPipeline = strings.ReplaceAll(ingestPipeline, "{{.format}}", "json")
}
return ingestPipeline
}
func determineIngestPipelineTargetName(path string) (string, error) {
name, ext, err := splitFilenameExt(path)
if err != nil {
return "", errors.Wrapf(err, "processing filename failed (path: %s)", path)
}
if name == "pipeline" || name == "pipeline-entry" {
return "default." + ext, nil
}
return fmt.Sprintf("%s.%s", name, ext), nil
}
func adjustUnsupportedStructuresInPipeline(data []byte) []byte {
data = reUnsupportedIfInPipeline.ReplaceAll(data, []byte{})
data = bytes.ReplaceAll(data, []byte("{< end >}"), []byte{})
data = reUnsupportedIngestPipelineInPipeline.ReplaceAllFunc(data, func(found []byte) []byte {
found = bytes.ReplaceAll(found, []byte("{<"), []byte("{{"))
found = bytes.ReplaceAll(found, []byte(">}"), []byte("}}"))
if found[0] == '"' {
found = bytes.ReplaceAll(found, []byte(`"`), []byte(`'`))
found[0] = '"'
found[len(found)-1] = '"'
}
return found
})
data = reUnsupportedPlaceholderInPipeline.ReplaceAll(data, []byte("FIX_ME"))
return data
}
func validateIngestPipeline(content ingestPipelineContent) error {
_, ext, err := splitFilenameExt(content.targetFileName)
if err != nil {
return errors.Wrapf(err, "processing filename failed (path: %s)", content.targetFileName)
}
var m mapStr
switch ext {
case "json":
err = json.Unmarshal(content.body, &m)
case "yml":
err = yaml.Unmarshal(content.body, &m)
default:
return fmt.Errorf("unsupported pipeline extension (path: %s)", content.targetFileName)
}
return err
}