in pkg/merger/merger.go [113:213]
func MergeAndUpdate(ctx context.Context, client mergeClient, mets *Metrics, list MergeList, skipValidate, confirm bool) (*configpb.Configuration, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var finish *metrics.CycleReporter
if mets != nil {
finish = mets.Update.Start()
}
// TODO: Cache the version for each source. Only read if they've changed.
shards := map[string]*configpb.Configuration{}
var shardsLock sync.Mutex
var fatal error
var wg sync.WaitGroup
for _, source := range list.Sources {
if source.Path == nil {
finish.Skip()
return nil, fmt.Errorf("path at %q is nil", source.Name)
}
wg.Add(1)
source := source
go func() {
defer wg.Done()
cfg, attrs, err := config.ReadGCS(ctx, client, *source.Path)
recordLastModified(attrs, mets, source.Name)
if err != nil {
// Log each fatal error, but it's okay to return any fatal error
logrus.WithError(err).WithFields(logrus.Fields{
"component": "config-merger",
"config-path": source.Location,
"contact": source.Contact,
}).Errorf("can't read config %q", source.Name)
fatal = fmt.Errorf("can't read config %q at %s: %w", source.Name, source.Path, err)
return
}
if !skipValidate {
if err := config.Validate(cfg); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"component": "config-merger",
"config-path": source.Location,
"contact": source.Contact,
}).Errorf("config %q is invalid; skipping config", source.Name)
return
}
}
shardsLock.Lock()
defer shardsLock.Unlock()
shards[source.Name] = cfg
}()
}
wg.Wait()
if fatal != nil {
finish.Fail()
return nil, fatal
}
if len(shards) == 0 {
finish.Skip()
return nil, errors.New("no configs to merge")
}
// Merge and output the result
result, err := config.Converge(shards)
if err != nil {
finish.Fail()
return result, fmt.Errorf("can't merge configurations: %w", err)
}
if !confirm {
fmt.Println(result)
finish.Success()
return result, nil
}
// Log each field as a metric
if mets != nil {
f := config.Fields(result)
for name, qty := range f {
mets.Fields.Set(qty, componentName, name)
}
}
buf, err := proto.Marshal(result)
if err != nil {
finish.Fail()
return result, fmt.Errorf("can't marshal merged proto: %w", err)
}
if _, err := client.Upload(ctx, *list.Path, buf, gcs.DefaultACL, gcs.NoCache); err != nil {
finish.Fail()
return result, fmt.Errorf("can't upload merged proto to %s: %w", list.Path, err)
}
finish.Success()
return result, nil
}