in pkg/updater/updater.go [256:438]
func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics, updateGroup GroupUpdater, opts *UpdateOptions, fixers ...Fixer) error {
ctx, cancel := context.WithCancel(parent)
defer cancel()
log := logrus.WithField("config", opts.ConfigPath)
var q config.TestGroupQueue
log.Debug("Observing config...")
newConfig, err := snapshot.Observe(ctx, log, client, opts.ConfigPath, time.NewTicker(time.Minute).C)
if err != nil {
return fmt.Errorf("observe config: %w", err)
}
cfg := <-newConfig
groups, err := testGroups(cfg, opts.GroupNames...)
if err != nil {
return fmt.Errorf("filter test groups: %w", err)
}
q.Init(log, groups, time.Now().Add(opts.Freq))
log.Debug("Fetching initial start times...")
fixLastUpdated := lastUpdated{
client: client,
gridPrefix: opts.GridPrefix,
configPath: opts.ConfigPath,
freq: opts.Freq,
}
if err := fixLastUpdated.fixOnce(ctx, log, &q, groups); err != nil {
return fmt.Errorf("get generations: %v", err)
}
log.Info("Fetched initial start times")
fixers = append(fixers, fixLastUpdated.Fix)
go func() {
ticker := time.NewTicker(time.Minute)
log := log
for {
depth, next, when := q.Status()
log := log.WithField("depth", depth)
if next != nil {
log = log.WithField("next", next.Name)
}
delay := time.Since(when)
if delay < 0 {
delay = 0
log = log.WithField("sleep", -delay)
}
if max := opts.Freq * 2; max > 0 && delay > max {
delay = max
}
log = log.WithField("delay", delay.Round(time.Second))
mets.delay(delay)
select {
case <-ctx.Done():
return
case <-ticker.C:
log.Info("Queue Status")
}
}
}()
go func() {
fixCtx, fixCancel := context.WithCancel(ctx)
var fixWg sync.WaitGroup
fixAll := func() {
n := len(fixers)
log.WithField("fixers", n).Trace("Starting fixers on current test groups...")
fixWg.Add(n)
for i, fix := range fixers {
go func(i int, fix Fixer) {
defer fixWg.Done()
if err := fix(fixCtx, log, &q, groups); err != nil && !errors.Is(err, context.Canceled) {
log.WithError(err).WithField("fixer", i).Warning("Fixer failed")
}
}(i, fix)
}
log.Debug("Started fixers on current test groups")
}
fixAll()
for {
select {
case <-ctx.Done():
fixCancel()
return
case cfg, ok := <-newConfig:
if !ok {
fixCancel()
return
}
log.Info("Updating config")
groups, err = testGroups(cfg, opts.GroupNames...)
if err != nil {
log.Errorf("Error during config update: %v", err)
}
log.Debug("Cancelling fixers on old test groups...")
fixCancel()
fixWg.Wait()
q.Init(log, groups, time.Now().Add(opts.Freq))
log.Debug("Canceled fixers on old test groups")
fixCtx, fixCancel = context.WithCancel(ctx)
fixAll()
}
}
}()
active := map[string]bool{}
var lock sync.RWMutex
var wg sync.WaitGroup
wg.Add(opts.GroupConcurrency)
defer wg.Wait()
channel := make(chan *configpb.TestGroup)
defer close(channel)
updateTestGroup := func(tg *configpb.TestGroup) {
name := tg.Name
log := log.WithField("group", name)
lock.RLock()
on := active[name]
lock.RUnlock()
if on {
log.Debug("Already updating...")
return
}
fin := mets.start()
tgp, err := TestGroupPath(opts.ConfigPath, opts.GridPrefix, name)
if err != nil {
fin.Fail()
log.WithError(err).Error("Bad path")
return
}
lock.Lock()
if active[name] {
lock.Unlock()
log.Debug("Another routine started updating...")
return
}
active[name] = true
lock.Unlock()
defer func() {
lock.Lock()
active[name] = false
lock.Unlock()
}()
start := time.Now()
unprocessed, err := updateGroup(ctx, log, client, tg, *tgp)
log.WithField("duration", time.Since(start)).Info("Finished processing group.")
if err != nil {
log := log.WithError(err)
if gcs.IsPreconditionFailed(err) {
fin.Skip()
log.Info("Group was modified while updating")
} else {
fin.Fail()
log.Error("Failed to update group")
}
var delay time.Duration
if opts.Freq > 0 {
delay = opts.Freq/4 + time.Duration(rand.Int63n(int64(opts.Freq/4))) // Int63n() panics if freq <= 0
log = log.WithField("delay", delay.Seconds())
q.Fix(tg.Name, time.Now().Add(delay), true)
}
return
}
fin.Success()
if unprocessed { // process another chunk ASAP
q.Fix(name, time.Now(), false)
}
}
for i := 0; i < opts.GroupConcurrency; i++ {
go func() {
defer wg.Done()
for tg := range channel {
updateTestGroup(tg)
}
}()
}
log.Info("Starting to process test groups...")
return q.Send(ctx, channel, opts.Freq)
}