pkg/merger/merger.go (167 lines of code) (raw):
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package merger
import (
"context"
"errors"
"fmt"
"sync"
"time"
"cloud.google.com/go/storage"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
yaml "sigs.k8s.io/yaml/goyaml.v2"
"github.com/GoogleCloudPlatform/testgrid/config"
configpb "github.com/GoogleCloudPlatform/testgrid/pb/config"
"github.com/GoogleCloudPlatform/testgrid/util/gcs"
"github.com/GoogleCloudPlatform/testgrid/util/metrics"
)
const componentName = "config-merger"
// MergeList is a list of config sources to merge together
// ParseAndCheck will construct this from a YAML document
type MergeList struct {
Target string `json:"Target"`
Path *gcs.Path `json:"-"`
Sources []Source `json:"Sources"`
}
// Source represents a configuration source in cloud storage
type Source struct {
Name string `json:"Name"`
Location string `json:"Location"`
Path *gcs.Path `json:"-"`
Contact string `json:"Contact,omitempty"`
}
// ParseAndCheck parses and checks the configuration file for common errors
func ParseAndCheck(data []byte) (list MergeList, err error) {
err = yaml.UnmarshalStrict(data, &list)
if err != nil {
return
}
list.Path, err = gcs.NewPath(list.Target)
if err != nil {
return
}
if len(list.Sources) == 0 {
return list, errors.New("no shards to converge")
}
names := map[string]bool{}
for i, source := range list.Sources {
if _, exists := names[source.Name]; exists {
return list, fmt.Errorf("duplicated name %s", source.Name)
}
path, err := gcs.NewPath(source.Location)
if err != nil {
return list, err
}
list.Sources[i].Path = path
source.Path = path
names[source.Name] = true
}
return
}
// Metrics holds metrics relevant to the config merger.
type Metrics struct {
Update metrics.Cyclic
Fields metrics.Int64
LastModified metrics.Int64
}
// CreateMetrics creates metrics for the Config Merger
func CreateMetrics(factory metrics.Factory) *Metrics {
return &Metrics{
Update: factory.NewCyclic(componentName),
Fields: factory.NewInt64("config_fields", "Config field usage by name", "component", "field"),
LastModified: factory.NewInt64("last_modified", "Seconds since shard last modified ", "shard"),
}
}
type mergeClient interface {
gcs.Opener
gcs.Uploader
}
// MergeAndUpdate gathers configurations from each path and merges them.
// Puts the result at targetPath if confirm is true
// Will skip an input config if it is invalid and skipValidate is false
// Other problems are considered fatal and will return an error
func MergeAndUpdate(ctx context.Context, client mergeClient, mets *Metrics, list MergeList, skipValidate, confirm bool) (*configpb.Configuration, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var finish *metrics.CycleReporter
if mets != nil {
finish = mets.Update.Start()
}
// TODO: Cache the version for each source. Only read if they've changed.
shards := map[string]*configpb.Configuration{}
var shardsLock sync.Mutex
var fatal error
var wg sync.WaitGroup
for _, source := range list.Sources {
if source.Path == nil {
finish.Skip()
return nil, fmt.Errorf("path at %q is nil", source.Name)
}
wg.Add(1)
source := source
go func() {
defer wg.Done()
cfg, attrs, err := config.ReadGCS(ctx, client, *source.Path)
recordLastModified(attrs, mets, source.Name)
if err != nil {
// Log each fatal error, but it's okay to return any fatal error
logrus.WithError(err).WithFields(logrus.Fields{
"component": "config-merger",
"config-path": source.Location,
"contact": source.Contact,
}).Errorf("can't read config %q", source.Name)
fatal = fmt.Errorf("can't read config %q at %s: %w", source.Name, source.Path, err)
return
}
if !skipValidate {
if err := config.Validate(cfg); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"component": "config-merger",
"config-path": source.Location,
"contact": source.Contact,
}).Errorf("config %q is invalid; skipping config", source.Name)
return
}
}
shardsLock.Lock()
defer shardsLock.Unlock()
shards[source.Name] = cfg
}()
}
wg.Wait()
if fatal != nil {
finish.Fail()
return nil, fatal
}
if len(shards) == 0 {
finish.Skip()
return nil, errors.New("no configs to merge")
}
// Merge and output the result
result, err := config.Converge(shards)
if err != nil {
finish.Fail()
return result, fmt.Errorf("can't merge configurations: %w", err)
}
if !confirm {
fmt.Println(result)
finish.Success()
return result, nil
}
// Log each field as a metric
if mets != nil {
f := config.Fields(result)
for name, qty := range f {
mets.Fields.Set(qty, componentName, name)
}
}
buf, err := proto.Marshal(result)
if err != nil {
finish.Fail()
return result, fmt.Errorf("can't marshal merged proto: %w", err)
}
if _, err := client.Upload(ctx, *list.Path, buf, gcs.DefaultACL, gcs.NoCache); err != nil {
finish.Fail()
return result, fmt.Errorf("can't upload merged proto to %s: %w", list.Path, err)
}
finish.Success()
return result, nil
}
func recordLastModified(attrs *storage.ReaderObjectAttrs, mets *Metrics, source string) {
if attrs != nil {
lastModified := attrs.LastModified
diff := time.Since(lastModified)
if mets != nil {
mets.LastModified.Set(int64(diff.Seconds()), source)
}
logrus.WithFields(logrus.Fields{
"diff": diff,
"shard": source,
}).Info("Time since last updated.")
}
}