packages/datastream.go (355 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package packages
import (
"encoding/json"
"fmt"
"os"
"path"
"strings"
yamlv2 "gopkg.in/yaml.v2"
ucfg "github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/yaml"
"github.com/elastic/package-registry/internal/util"
)
const (
DirIngestPipeline = "ingest_pipeline"
DefaultPipelineName = "default"
DefaultPipelineNameJSON = "default.json"
DefaultPipelineNameYAML = "default.yml"
)
var validTypes = map[string]string{
"logs": "Logs",
"metrics": "Metrics",
"synthetics": "Synthetics",
"traces": "Traces",
}
type DataStream struct {
// Name and type of the data stream. This is linked to data_stream.dataset and data_stream.type fields.
Type string `config:"type" json:"type" validate:"required"`
Dataset string `config:"dataset" json:"dataset,omitempty" yaml:"dataset,omitempty"`
Hidden bool `config:"hidden" json:"hidden,omitempty" yaml:"hidden,omitempty"`
IlmPolicy string `config:"ilm_policy" json:"ilm_policy,omitempty" yaml:"ilm_policy,omitempty"`
DatasetIsPrefix bool `config:"dataset_is_prefix" json:"dataset_is_prefix,omitempty" yaml:"dataset_is_prefix,omitempty"`
Title string `config:"title" json:"title" validate:"required"`
Release string `config:"release" json:"release,omitempty"`
// Deprecated: Replaced by elasticsearch.ingest_pipeline.name
IngestPipeline string `config:"ingest_pipeline,omitempty" json:"ingest_pipeline,omitempty" yaml:"ingest_pipeline,omitempty"`
Streams []Stream `config:"streams" json:"streams,omitempty" yaml:"streams,omitempty" `
Package string `json:"package,omitempty" yaml:"package,omitempty"`
Elasticsearch *DataStreamElasticsearch `config:"elasticsearch,omitempty" json:"elasticsearch,omitempty" yaml:"elasticsearch,omitempty"`
Agent *DataStreamAgent `config:"agent,omitempty" json:"agent,omitempty" yaml:"agent,omitempty"`
// Generated fields
Path string `json:"path,omitempty" yaml:"path,omitempty"`
// Local path to the data stream directory, relative to the package directory
BasePath string `json:"-" yaml:"-"`
// Reference to the package containing this data stream
packageRef *Package
}
type Input struct {
Type string `config:"type" json:"type" validate:"required"`
Vars []Variable `config:"vars" json:"vars,omitempty" yaml:"vars,omitempty"`
Title string `config:"title" json:"title,omitempty" yaml:"title,omitempty"`
Description string `config:"description" json:"description,omitempty" yaml:"description,omitempty"`
Streams []Stream `config:"streams" json:"streams,omitempty" yaml:"streams,omitempty"`
TemplatePath string `config:"template_path" json:"template_path,omitempty" yaml:"template_path,omitempty"`
InputGroup string `config:"input_group" json:"input_group,omitempty" yaml:"input_group,omitempty"`
}
type Stream struct {
Input string `config:"input" json:"input" validate:"required"`
Vars []Variable `config:"vars" json:"vars,omitempty" yaml:"vars,omitempty"`
DataStream string `config:"data_stream" json:"data_stream,omitempty" yaml:"data_stream,omitempty"`
// TODO: This might cause issues when consuming the json as the key contains . (had been an issue in the past if I remember correctly)
TemplatePath string `config:"template_path" json:"template_path,omitempty" yaml:"template_path,omitempty"`
Title string `config:"title" json:"title,omitempty" yaml:"title,omitempty"`
Description string `config:"description" json:"description,omitempty" yaml:"description,omitempty"`
Enabled *bool `config:"enabled" json:"enabled,omitempty" yaml:"enabled,omitempty"`
}
type Variable struct {
Name string `config:"name" json:"name" yaml:"name"`
Type string `config:"type" json:"type" yaml:"type"`
Title string `config:"title" json:"title,omitempty" yaml:"title,omitempty"`
Description string `config:"description" json:"description,omitempty" yaml:"description,omitempty"`
Multi bool `config:"multi" json:"multi" yaml:"multi"`
Required bool `config:"required" json:"required" yaml:"required"`
ShowUser bool `config:"show_user" json:"show_user" yaml:"show_user"`
Default interface{} `config:"default" json:"default,omitempty" yaml:"default,omitempty"`
}
type DataStreamAgent struct {
Privileges *DataStreamAgentPrivileges `config:"privileges,omitempty" json:"privileges,omitempty" yaml:"privileges,omitempty"`
}
type DataStreamAgentPrivileges struct {
Root bool `config:"root,omitempty" json:"root,omitempty" yaml:"root,omitempty"`
}
type DataStreamElasticsearch struct {
IndexTemplateSettings map[string]interface{} `config:"index_template.settings" json:"index_template.settings,omitempty" yaml:"index_template.settings,omitempty"`
IndexTemplateMappings map[string]interface{} `config:"index_template.mappings" json:"index_template.mappings,omitempty" yaml:"index_template.mappings,omitempty"`
IngestPipelineName string `config:"ingest_pipeline.name,omitempty" json:"ingest_pipeline.name,omitempty" yaml:"ingest_pipeline.name,omitempty"`
Privileges *DataStreamElasticsearchPrivileges `config:"privileges,omitempty" json:"privileges,omitempty" yaml:"privileges,omitempty"`
}
type DataStreamElasticsearchPrivileges struct {
Indices []string `config:"indices,omitempty" json:"indices,omitempty" yaml:"indices,omitempty"`
}
type fieldEntry struct {
name string
aType string
}
func NewDataStream(basePath string, p *Package) (*DataStream, error) {
fsys, err := p.fs()
if err != nil {
return nil, err
}
defer fsys.Close()
manifestPath := path.Join(basePath, "manifest.yml")
// Check if manifest exists
_, err = fsys.Stat(manifestPath)
if err != nil && os.IsNotExist(err) {
return nil, fmt.Errorf("manifest does not exist for data stream: %s: %w", p.BasePath, err)
}
dataStreamPath := path.Base(basePath)
b, err := ReadAll(fsys, manifestPath)
if err != nil {
return nil, fmt.Errorf("failed to read manifest: %s: %w", err, err)
}
manifest, err := yaml.NewConfig(b, ucfg.PathSep("."))
if err != nil {
return nil, fmt.Errorf("error creating new manifest config: %w", err)
}
var d = &DataStream{
Package: p.Name,
packageRef: p,
// This is the name of the directory of the dataStream
Path: dataStreamPath,
BasePath: basePath,
}
// go-ucfg automatically calls the `Validate` method on the DataStream object here
err = manifest.Unpack(d, ucfg.PathSep("."))
if err != nil {
return nil, fmt.Errorf("error building data stream (path: %s) in package: %s: %w", dataStreamPath, p.Name, err)
}
// if id is not set, {package}.{dataStreamPath} is the default
if d.Dataset == "" {
d.Dataset = p.Name + "." + dataStreamPath
}
if d.Release == "" {
d.Release = p.Release
}
// Default for the enabled flags is true.
trueValue := true
for i := range d.Streams {
if d.Streams[i].Enabled == nil {
d.Streams[i].Enabled = &trueValue
}
// TODO: validate that the template path actually exists
if d.Streams[i].TemplatePath == "" {
d.Streams[i].TemplatePath = "stream.yml.hbs"
}
}
if !IsValidRelease(d.Release) {
return nil, fmt.Errorf("invalid release: %q", d.Release)
}
pipelineDir := path.Join(d.BasePath, "elasticsearch", DirIngestPipeline)
paths, err := fsys.Glob(path.Join(pipelineDir, "*"))
if err != nil {
return nil, err
}
if d.Elasticsearch != nil && d.Elasticsearch.IngestPipelineName == "" {
// Check that no ingest pipeline exists in the directory except default
for _, p := range paths {
if path.Base(p) == DefaultPipelineNameJSON || path.Base(p) == DefaultPipelineNameYAML {
d.Elasticsearch.IngestPipelineName = DefaultPipelineName
// TODO: remove because of legacy
d.IngestPipeline = DefaultPipelineName
break
}
}
// TODO: Remove, only here for legacy
} else if d.IngestPipeline == "" {
// Check that no ingest pipeline exists in the directory except default
for _, p := range paths {
if path.Base(p) == DefaultPipelineNameJSON || path.Base(p) == DefaultPipelineNameYAML {
d.IngestPipeline = DefaultPipelineName
break
}
}
}
if d.IngestPipeline == "" && len(paths) > 0 {
return nil, fmt.Errorf("unused pipelines in the package (dataset: %s): %s", d.Dataset, strings.Join(paths, ","))
}
return d, nil
}
func (d *DataStream) Validate() error {
if ValidationDisabled {
return nil
}
if strings.Contains(d.Dataset, "-") {
return fmt.Errorf("data stream name is not allowed to contain `-`: %s", d.Dataset)
}
if !d.validType() {
return fmt.Errorf("type is not valid: %s", d.Type)
}
fsys, err := d.packageRef.fs()
if err != nil {
return err
}
defer fsys.Close()
// In case an ingest pipeline is set, check if it is around
pipelineDir := path.Join(d.BasePath, "elasticsearch", DirIngestPipeline)
if d.IngestPipeline != "" {
var validFound bool
jsonPipelinePath := path.Join(pipelineDir, d.IngestPipeline+".json")
_, errJSON := fsys.Stat(jsonPipelinePath)
if errJSON != nil && !os.IsNotExist(errJSON) {
return fmt.Errorf("stat ingest pipeline JSON file failed (path: %s): %w", jsonPipelinePath, errJSON)
}
if !os.IsNotExist(errJSON) {
err := validateIngestPipelineFile(fsys, jsonPipelinePath)
if err != nil {
return fmt.Errorf("validating ingest pipeline JSON file failed (path: %s): %w", jsonPipelinePath, err)
}
validFound = true
}
yamlPipelinePath := path.Join(pipelineDir, d.IngestPipeline+".yml")
_, errYAML := fsys.Stat(yamlPipelinePath)
if errYAML != nil && !os.IsNotExist(errYAML) {
return fmt.Errorf("stat ingest pipeline YAML file failed (path: %s): %w", jsonPipelinePath, errYAML)
}
if !os.IsNotExist(errYAML) {
err := validateIngestPipelineFile(fsys, yamlPipelinePath)
if err != nil {
return fmt.Errorf("validating ingest pipeline YAML file failed (path: %s): %w", jsonPipelinePath, err)
}
validFound = true
}
if !validFound {
return fmt.Errorf("defined ingest_pipeline does not exist: %s", pipelineDir+d.IngestPipeline)
}
}
err = d.validateRequiredFields(fsys)
if err != nil {
return fmt.Errorf("validating required fields failed: %w", err)
}
return nil
}
func (d *DataStream) validType() bool {
_, exists := validTypes[d.Type]
return exists
}
func validateIngestPipelineFile(fs PackageFileSystem, pipelinePath string) error {
f, err := ReadAll(fs, pipelinePath)
if err != nil {
return fmt.Errorf("reading ingest pipeline file failed (path: %s): %w", pipelinePath, err)
}
ext := path.Ext(pipelinePath)
var m map[string]interface{}
switch ext {
case ".json":
err = json.Unmarshal(f, &m)
case ".yml":
err = yamlv2.Unmarshal(f, &m)
default:
return fmt.Errorf("unsupported pipeline extension (path: %s, ext: %s)", pipelinePath, ext)
}
return err
}
// validateRequiredFields method loads fields from all files and checks if required fields are present.
func (d *DataStream) validateRequiredFields(fs PackageFileSystem) error {
fieldsDirPath := path.Join(d.BasePath, "fields")
// Collect fields from all files
fieldsFiles, err := fs.Glob(path.Join(fieldsDirPath, "*"))
if err != nil {
return err
}
var allFields []util.MapStr
for _, path := range fieldsFiles {
body, err := ReadAll(fs, path)
if err != nil {
return fmt.Errorf("reading file failed (path: %s): %w", path, err)
}
var m []util.MapStr
err = yamlv2.Unmarshal(body, &m)
if err != nil {
return fmt.Errorf("unmarshaling file failed (path: %s): %w", path, err)
}
allFields = append(allFields, m...)
}
if err != nil {
return fmt.Errorf("walking through fields files failed: %w", err)
}
// Flatten all fields
for i, fields := range allFields {
allFields[i] = fields.Flatten()
}
// Verify required keys
err = requireField(allFields, "data_stream.type", "constant_keyword", err)
err = requireField(allFields, "data_stream.dataset", "constant_keyword", err)
err = requireField(allFields, "data_stream.namespace", "constant_keyword", err)
err = requireField(allFields, "@timestamp", "date", err)
return err
}
func requireField(allFields []util.MapStr, searchedName, expectedType string, validationErr error) error {
if validationErr != nil {
return validationErr
}
f, err := findField(allFields, searchedName)
if err != nil {
f, err = findFieldSplit(allFields, searchedName)
if err != nil {
return fmt.Errorf("finding field failed (searchedName: %s): %w", searchedName, err)
}
}
if f.aType != expectedType {
return fmt.Errorf("wrong field type for '%s' (expected: %s, got: %s)", searchedName, expectedType, f.aType)
}
return nil
}
func findFieldSplit(allFields []util.MapStr, searchedName string) (*fieldEntry, error) {
levels := strings.Split(searchedName, ".")
curFields := allFields
var err error
for _, part := range levels[:len(levels)-1] {
curFields, err = getFieldsArray(curFields, part)
if err != nil {
return nil, fmt.Errorf("failed to find fields array: %w", err)
}
}
return findField(curFields, levels[len(levels)-1])
}
func createMapStr(in interface{}) (util.MapStr, error) {
m := make(util.MapStr)
v, ok := in.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("unable to convert %v to known type", in)
}
for k, val := range v {
m[fmt.Sprintf("%v", k)] = fmt.Sprintf("%v", val)
}
return m, nil
}
func getFieldsArray(allFields []util.MapStr, searchedName string) ([]util.MapStr, error) {
for _, fields := range allFields {
name, err := fields.GetValue("name")
if err != nil {
return nil, fmt.Errorf("cannot get value (key: name): %w", err)
}
if name == searchedName {
value, err := fields.GetValue("fields")
if err != nil {
return nil, fmt.Errorf("cannot get fields: %w", err)
}
if inArray, ok := value.([]interface{}); ok {
m := make([]util.MapStr, 0, len(inArray))
for _, in := range inArray {
mapStr, err := createMapStr(in)
if err != nil {
return nil, fmt.Errorf("cannot create MapStr: %w", err)
}
m = append(m, mapStr)
}
return m, nil
}
return nil, fmt.Errorf("fields was not []MapStr")
}
}
return nil, fmt.Errorf("field '%s' not found", searchedName)
}
func findField(allFields []util.MapStr, searchedName string) (*fieldEntry, error) {
for _, fields := range allFields {
name, err := fields.GetValue("name")
if err != nil {
return nil, fmt.Errorf("cannot get value (key: name): %w", err)
}
if name != searchedName {
continue
}
aType, err := fields.GetValue("type")
if err != nil {
return nil, fmt.Errorf("cannot get value (key: type): %w", err)
}
if aType == "" {
return nil, fmt.Errorf("field '%s' found, but type is undefined", searchedName)
}
return &fieldEntry{
name: name.(string),
aType: aType.(string),
}, nil
}
return nil, fmt.Errorf("field '%s' not found", searchedName)
}