func MergeAndUpdate()

in pkg/merger/merger.go [113:213]


func MergeAndUpdate(ctx context.Context, client mergeClient, mets *Metrics, list MergeList, skipValidate, confirm bool) (*configpb.Configuration, error) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	var finish *metrics.CycleReporter
	if mets != nil {
		finish = mets.Update.Start()
	}

	// TODO: Cache the version for each source. Only read if they've changed.
	shards := map[string]*configpb.Configuration{}
	var shardsLock sync.Mutex
	var fatal error

	var wg sync.WaitGroup

	for _, source := range list.Sources {
		if source.Path == nil {
			finish.Skip()
			return nil, fmt.Errorf("path at %q is nil", source.Name)
		}

		wg.Add(1)
		source := source
		go func() {
			defer wg.Done()
			cfg, attrs, err := config.ReadGCS(ctx, client, *source.Path)
			recordLastModified(attrs, mets, source.Name)
			if err != nil {
				// Log each fatal error, but it's okay to return any fatal error
				logrus.WithError(err).WithFields(logrus.Fields{
					"component":   "config-merger",
					"config-path": source.Location,
					"contact":     source.Contact,
				}).Errorf("can't read config %q", source.Name)
				fatal = fmt.Errorf("can't read config %q at %s: %w", source.Name, source.Path, err)
				return
			}
			if !skipValidate {
				if err := config.Validate(cfg); err != nil {
					logrus.WithError(err).WithFields(logrus.Fields{
						"component":   "config-merger",
						"config-path": source.Location,
						"contact":     source.Contact,
					}).Errorf("config %q is invalid; skipping config", source.Name)
					return
				}
			}

			shardsLock.Lock()
			defer shardsLock.Unlock()
			shards[source.Name] = cfg
		}()
	}

	wg.Wait()

	if fatal != nil {
		finish.Fail()
		return nil, fatal
	}
	if len(shards) == 0 {
		finish.Skip()
		return nil, errors.New("no configs to merge")
	}

	// Merge and output the result
	result, err := config.Converge(shards)
	if err != nil {
		finish.Fail()
		return result, fmt.Errorf("can't merge configurations: %w", err)
	}

	if !confirm {
		fmt.Println(result)
		finish.Success()
		return result, nil
	}

	// Log each field as a metric
	if mets != nil {
		f := config.Fields(result)
		for name, qty := range f {
			mets.Fields.Set(qty, componentName, name)
		}
	}

	buf, err := proto.Marshal(result)
	if err != nil {
		finish.Fail()
		return result, fmt.Errorf("can't marshal merged proto: %w", err)
	}

	if _, err := client.Upload(ctx, *list.Path, buf, gcs.DefaultACL, gcs.NoCache); err != nil {
		finish.Fail()
		return result, fmt.Errorf("can't upload merged proto to %s: %w", list.Path, err)
	}

	finish.Success()
	return result, nil
}