func Update()

in pkg/updater/updater.go [256:438]


func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics, updateGroup GroupUpdater, opts *UpdateOptions, fixers ...Fixer) error {
	ctx, cancel := context.WithCancel(parent)
	defer cancel()
	log := logrus.WithField("config", opts.ConfigPath)

	var q config.TestGroupQueue

	log.Debug("Observing config...")
	newConfig, err := snapshot.Observe(ctx, log, client, opts.ConfigPath, time.NewTicker(time.Minute).C)
	if err != nil {
		return fmt.Errorf("observe config: %w", err)

	}
	cfg := <-newConfig
	groups, err := testGroups(cfg, opts.GroupNames...)
	if err != nil {
		return fmt.Errorf("filter test groups: %w", err)
	}

	q.Init(log, groups, time.Now().Add(opts.Freq))

	log.Debug("Fetching initial start times...")
	fixLastUpdated := lastUpdated{
		client:     client,
		gridPrefix: opts.GridPrefix,
		configPath: opts.ConfigPath,
		freq:       opts.Freq,
	}
	if err := fixLastUpdated.fixOnce(ctx, log, &q, groups); err != nil {
		return fmt.Errorf("get generations: %v", err)
	}
	log.Info("Fetched initial start times")

	fixers = append(fixers, fixLastUpdated.Fix)

	go func() {
		ticker := time.NewTicker(time.Minute)
		log := log
		for {
			depth, next, when := q.Status()
			log := log.WithField("depth", depth)
			if next != nil {
				log = log.WithField("next", next.Name)
			}
			delay := time.Since(when)
			if delay < 0 {
				delay = 0
				log = log.WithField("sleep", -delay)
			}
			if max := opts.Freq * 2; max > 0 && delay > max {
				delay = max
			}
			log = log.WithField("delay", delay.Round(time.Second))
			mets.delay(delay)
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				log.Info("Queue Status")
			}
		}
	}()

	go func() {
		fixCtx, fixCancel := context.WithCancel(ctx)
		var fixWg sync.WaitGroup
		fixAll := func() {
			n := len(fixers)
			log.WithField("fixers", n).Trace("Starting fixers on current test groups...")
			fixWg.Add(n)
			for i, fix := range fixers {
				go func(i int, fix Fixer) {
					defer fixWg.Done()
					if err := fix(fixCtx, log, &q, groups); err != nil && !errors.Is(err, context.Canceled) {
						log.WithError(err).WithField("fixer", i).Warning("Fixer failed")
					}
				}(i, fix)
			}
			log.Debug("Started fixers on current test groups")
		}
		fixAll()
		for {
			select {
			case <-ctx.Done():
				fixCancel()
				return
			case cfg, ok := <-newConfig:
				if !ok {
					fixCancel()
					return
				}
				log.Info("Updating config")
				groups, err = testGroups(cfg, opts.GroupNames...)
				if err != nil {
					log.Errorf("Error during config update: %v", err)
				}
				log.Debug("Cancelling fixers on old test groups...")
				fixCancel()
				fixWg.Wait()
				q.Init(log, groups, time.Now().Add(opts.Freq))
				log.Debug("Canceled fixers on old test groups")
				fixCtx, fixCancel = context.WithCancel(ctx)
				fixAll()
			}
		}
	}()

	active := map[string]bool{}
	var lock sync.RWMutex
	var wg sync.WaitGroup
	wg.Add(opts.GroupConcurrency)
	defer wg.Wait()
	channel := make(chan *configpb.TestGroup)
	defer close(channel)

	updateTestGroup := func(tg *configpb.TestGroup) {
		name := tg.Name
		log := log.WithField("group", name)
		lock.RLock()
		on := active[name]
		lock.RUnlock()
		if on {
			log.Debug("Already updating...")
			return
		}
		fin := mets.start()
		tgp, err := TestGroupPath(opts.ConfigPath, opts.GridPrefix, name)
		if err != nil {
			fin.Fail()
			log.WithError(err).Error("Bad path")
			return
		}
		lock.Lock()
		if active[name] {
			lock.Unlock()
			log.Debug("Another routine started updating...")
			return
		}
		active[name] = true
		lock.Unlock()
		defer func() {
			lock.Lock()
			active[name] = false
			lock.Unlock()
		}()
		start := time.Now()
		unprocessed, err := updateGroup(ctx, log, client, tg, *tgp)
		log.WithField("duration", time.Since(start)).Info("Finished processing group.")
		if err != nil {
			log := log.WithError(err)
			if gcs.IsPreconditionFailed(err) {
				fin.Skip()
				log.Info("Group was modified while updating")
			} else {
				fin.Fail()
				log.Error("Failed to update group")
			}
			var delay time.Duration
			if opts.Freq > 0 {
				delay = opts.Freq/4 + time.Duration(rand.Int63n(int64(opts.Freq/4))) // Int63n() panics if freq <= 0
				log = log.WithField("delay", delay.Seconds())
				q.Fix(tg.Name, time.Now().Add(delay), true)
			}
			return
		}
		fin.Success()
		if unprocessed { // process another chunk ASAP
			q.Fix(name, time.Now(), false)
		}
	}

	for i := 0; i < opts.GroupConcurrency; i++ {
		go func() {
			defer wg.Done()
			for tg := range channel {
				updateTestGroup(tg)
			}
		}()
	}

	log.Info("Starting to process test groups...")
	return q.Send(ctx, channel, opts.Freq)
}