in pkg/tabulator/tabstate.go [105:302]
func Update(ctx context.Context, client gcs.ConditionalClient, mets *Metrics, opts *UpdateOptions, fixers ...Fixer) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if opts.ReadConcurrency < 1 || opts.WriteConcurrency < 1 {
return fmt.Errorf("concurrency must be positive, got read %d and write %d", opts.ReadConcurrency, opts.WriteConcurrency)
}
log := logrus.WithField("config", opts.ConfigPath)
var q config.TestGroupQueue
log.Debug("Observing config...")
cfgChanged, err := snapshot.Observe(ctx, log, client, opts.ConfigPath, time.NewTicker(time.Minute).C)
if err != nil {
return fmt.Errorf("error while observing config %q: %w", opts.ConfigPath.String(), err)
}
var cfg *snapshot.Config
var tasksPerGroup map[string][]writeTask
fixSnapshot := func(newConfig *snapshot.Config) {
cfg = newConfig
tasksPerGroup = mapTasks(cfg)
if len(opts.AllowedGroups) != 0 {
groups := make([]*configpb.TestGroup, 0, len(opts.AllowedGroups))
for _, group := range opts.AllowedGroups {
c, ok := cfg.Groups[group]
if !ok {
log.Errorf("Could not find requested group %q in config", c)
continue
}
groups = append(groups, c)
}
q.Init(log, groups, time.Now())
return
}
groups := make([]*configpb.TestGroup, 0, len(cfg.Groups))
for _, group := range cfg.Groups {
groups = append(groups, group)
}
q.Init(log, groups, time.Now())
}
fixSnapshot(<-cfgChanged)
go func(ctx context.Context) {
fixCtx, fixCancel := context.WithCancel(ctx)
var fixWg sync.WaitGroup
fixAll := func() {
n := len(fixers)
log.WithField("fixers", n).Debug("Starting fixers on current groups...")
fixWg.Add(n)
for i, fix := range fixers {
go func(i int, fix Fixer) {
defer fixWg.Done()
if err := fix(fixCtx, &q); err != nil && !errors.Is(err, context.Canceled) {
log.WithError(err).WithField("fixer", i).Warning("Fixer failed")
}
}(i, fix)
}
log.WithField("fixers", n).Info("Started fixers on current groups.")
}
ticker := time.NewTicker(time.Minute)
fixAll()
defer ticker.Stop()
for {
depth, next, when := q.Status()
log := log.WithField("depth", depth)
if next != nil {
log = log.WithField("next", &next)
}
delay := time.Since(when)
if delay < 0 {
delay = 0
log = log.WithField("sleep", -delay)
}
mets.DelaySeconds.Set(delay, componentName)
log.Debug("Calculated metrics")
select {
case <-ctx.Done():
ticker.Stop()
fixCancel()
fixWg.Wait()
return
case newConfig, ok := <-cfgChanged:
if !ok {
log.Info("Configuration channel closed")
cfgChanged = nil
continue
}
log.Info("Configuration changed")
fixCancel()
fixWg.Wait()
fixCtx, fixCancel = context.WithCancel(ctx)
fixSnapshot(newConfig)
fixAll()
case <-ticker.C:
}
}
}(ctx)
// Set up worker pools
groups := make(chan *configpb.TestGroup)
tasks := make(chan writeTask)
var tabLock sync.Mutex
read := func(ctx context.Context, log *logrus.Entry, group *configpb.TestGroup) error {
if group == nil {
return errors.New("nil group to read")
}
fromPath, err := updater.TestGroupPath(opts.ConfigPath, opts.GridPathPrefix, group.Name)
if err != nil {
return fmt.Errorf("can't make tg path %q: %w", group.Name, err)
}
log.WithField("from", fromPath.String()).Info("Reading state")
grid, _, err := gcs.DownloadGrid(ctx, client, *fromPath)
if err != nil {
return fmt.Errorf("downloadGrid(%s): %w", fromPath, err)
}
tabLock.Lock()
defer tabLock.Unlock()
// lock out all other readers so that all these tabs get handled as soon as possible
for _, task := range tasksPerGroup[group.Name] {
log := log.WithFields(logrus.Fields{
"group": task.group.GetName(),
"dashboard": task.dashboard.GetName(),
"tab": task.tab.GetName(),
})
select {
case <-ctx.Done():
log.Debug("Skipping irrelevant task")
continue
default:
out := task
out.data = proto.Clone(grid).(*statepb.Grid)
log.Debug("Requesting write task")
tasks <- out
}
}
return nil
}
// Run threads continuously
var readWg, writeWg sync.WaitGroup
readWg.Add(opts.ReadConcurrency)
for i := 0; i < opts.ReadConcurrency; i++ {
go func() {
defer readWg.Done()
for group := range groups {
readCtx, cancel := context.WithCancel(ctx)
log = log.WithField("group", group.Name)
err := read(readCtx, log, group)
cancel()
if err != nil {
next := time.Now().Add(opts.Freq / 10)
q.Fix(group.Name, next, false)
log.WithError(err).WithField("retry-at", next).Error("failed to read, retry later")
}
}
}()
}
writeWg.Add(opts.WriteConcurrency)
for i := 0; i < opts.WriteConcurrency; i++ {
go func() {
defer writeWg.Done()
for task := range tasks {
writeCtx, cancel := context.WithTimeout(ctx, writeTimeout)
finish := mets.UpdateState.Start()
log = log.WithField("dashboard", task.dashboard.Name).WithField("tab", task.tab.Name)
err := createTabState(writeCtx, log, client, task, opts.ConfigPath, opts.TabsPathPrefix, opts.Confirm, opts.CalculateStats, opts.UseTabAlertSettings, opts.ExtendState)
cancel()
if err != nil {
finish.Fail()
log.Errorf("write: %v", err)
continue
}
finish.Success()
}
}()
}
defer writeWg.Wait()
defer close(tasks)
defer readWg.Wait()
defer close(groups)
return q.Send(ctx, groups, opts.Freq)
}