pkg/controller/logstash/pipelines/config.go (95 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package pipelines import ( "fmt" "reflect" "github.com/elastic/go-ucfg" uyaml "github.com/elastic/go-ucfg/yaml" "gopkg.in/yaml.v3" ) // Config contains configuration for Logstash pipeline ("pipelines.yml"), // `.` in between the key, pipeline.id, is treated as string // pipelines.yml is expected an array of pipeline definition. type Config ucfg.Config // Options are config options for the YAML file. var Options = []ucfg.Option{ucfg.AppendValues} // EmptyConfig creates a new empty config. func EmptyConfig() *Config { return fromConfig(ucfg.New()) } // FromSpec creates a new pipeline from spec. func FromSpec(cfg interface{}) (*Config, error) { config, err := ucfg.NewFrom(cfg, Options...) if err != nil { return nil, err } return fromConfig(config), nil } // MustFromSpec creates a new pipeline and panics on errors. // Use for testing only. func MustFromSpec(cfg interface{}) *Config { config, err := FromSpec(cfg) if err != nil { panic(err) } return config } // Parse parses the given pipeline content into a PipelinesConfig. // Expects content to be in YAML format. func Parse(yml []byte) (*Config, error) { config, err := uyaml.NewConfig(yml, Options...) if err != nil { return nil, err } return fromConfig(config), nil } // MustParse parses the given pipeline content into a Pipelines. // Expects content to be in YAML format. Panics on error. // Use for testing only. func MustParse(yml []byte) *Config { config, err := uyaml.NewConfig(yml, Options...) if err != nil { panic(err) } return fromConfig(config) } // Render returns the content of the configuration file, // with fields sorted alphabetically. func (c *Config) Render() ([]byte, error) { if c == nil { return []byte{}, nil } var out []interface{} if err := c.asUCfg().Unpack(&out); err != nil { return []byte{}, err } return yaml.Marshal(out) } func (c *Config) asUCfg() *ucfg.Config { return (*ucfg.Config)(c) } func fromConfig(in *ucfg.Config) *Config { return (*Config)(in) } // Diff returns true if the key/value or the sequence of two PipelinesConfig are different. // Use for testing only. func (c *Config) Diff(c2 *Config) (bool, error) { if c == c2 { return false, nil } if c == nil && c2 != nil { return true, fmt.Errorf("empty lhs config %s", c2.asUCfg().FlattenedKeys(Options...)) } if c != nil && c2 == nil { return true, fmt.Errorf("empty rhs config %s", c.asUCfg().FlattenedKeys(Options...)) } var s []map[string]interface{} var s2 []map[string]interface{} err := c.asUCfg().Unpack(&s, Options...) if err != nil { return true, err } err = c2.asUCfg().Unpack(&s2, Options...) if err != nil { return true, err } return diffSlice(s, s2) } // diffSlice returns true if the key/value or the sequence of two PipelinesConfig are different. func diffSlice(s1, s2 []map[string]interface{}) (bool, error) { if len(s1) != len(s2) { return true, fmt.Errorf("array size doesn't match %d, %d", len(s1), len(s2)) } var diff []string for i, m := range s1 { m2 := s2[i] if eq := reflect.DeepEqual(m, m2); !eq { diff = append(diff, fmt.Sprintf("%s vs %s, ", m, m2)) } } if len(diff) > 0 { return true, fmt.Errorf("there are %d differences. %s", len(diff), diff) } return false, nil }