alerter/rules/localStore.go (97 lines of code) (raw):
package rules
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"strings"
alertrulev1 "github.com/Azure/adx-mon/api/v1"
"github.com/Azure/adx-mon/pkg/logger"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/yaml"
)
type fileStore struct {
rules []*Rule
}
func FromPath(path, region string) (*fileStore, error) {
s := &fileStore{}
// walk files in directory
err := filepath.WalkDir(path, func(path string, info os.DirEntry, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open file '%s': %w", path, err)
}
defer f.Close()
logger.Infof("reading file %q", path)
err = s.fromStream(f, region)
if err != nil {
return fmt.Errorf("failed to read file '%s': %w", path, err)
}
return nil
})
knownRules := map[string]bool{}
for _, rule := range s.Rules() {
key := rule.Namespace + "/" + rule.Name
if knownRules[key] {
return nil, fmt.Errorf("duplicate rule %s", key)
}
knownRules[key] = true
}
return s, err
}
func (s *fileStore) fromStream(file io.Reader, region string) error {
contents, err := io.ReadAll(file)
if err != nil {
return fmt.Errorf("fromStream failed to read file: %w", err)
}
chunks := bytes.Split(contents, []byte("---"))
for _, chunk := range chunks {
whitespaceTrimmed := strings.TrimSpace(string(chunk))
if len(whitespaceTrimmed) == 0 {
continue
}
genericStructure := make(map[string]interface{})
err = yaml.Unmarshal(chunk, &genericStructure)
if err != nil {
return fmt.Errorf("fromStream failed to unmarshal yaml: %w", err)
}
// check if this is a rule
kind, ok := genericStructure["kind"]
if !ok || kind != "AlertRule" {
logger.Warn("found non-rule yaml, skipping")
continue
}
// create new spec here
rule := alertrulev1.AlertRule{}
// pass a reference to spec reference
err = yaml.UnmarshalStrict(chunk, &rule)
if err != nil {
return fmt.Errorf("fromStream failed to unmarshal yaml: %w", err)
}
// validate the rule passes generic k8s metadata checks
if err := validateMetadata(&rule.ObjectMeta); err != nil {
return fmt.Errorf("fromStream failed to validate metadata: %w", err)
}
logger.Infof("found rule %q", rule.ObjectMeta.Name)
r, err := toRule(rule, region)
if err != nil {
return fmt.Errorf("fromStream failed to convert rule: %w", err)
}
s.rules = append(s.rules, r)
}
return nil
}
func (s *fileStore) Rules() []*Rule {
return s.rules
}
// validateMetadata validates the metadata of a Kubernetes object
func validateMetadata(meta *metav1.ObjectMeta) error {
allErrs := validation.ValidateObjectMeta(meta, true, validation.NameIsDNSLabel, field.NewPath("metadata"))
if len(allErrs) > 0 {
return allErrs.ToAggregate()
}
return nil
}