func Update()

in pkg/tabulator/tabstate.go [105:302]


func Update(ctx context.Context, client gcs.ConditionalClient, mets *Metrics, opts *UpdateOptions, fixers ...Fixer) error {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	if opts.ReadConcurrency < 1 || opts.WriteConcurrency < 1 {
		return fmt.Errorf("concurrency must be positive, got read %d and write %d", opts.ReadConcurrency, opts.WriteConcurrency)
	}
	log := logrus.WithField("config", opts.ConfigPath)

	var q config.TestGroupQueue

	log.Debug("Observing config...")
	cfgChanged, err := snapshot.Observe(ctx, log, client, opts.ConfigPath, time.NewTicker(time.Minute).C)
	if err != nil {
		return fmt.Errorf("error while observing config %q: %w", opts.ConfigPath.String(), err)
	}

	var cfg *snapshot.Config
	var tasksPerGroup map[string][]writeTask
	fixSnapshot := func(newConfig *snapshot.Config) {
		cfg = newConfig
		tasksPerGroup = mapTasks(cfg)

		if len(opts.AllowedGroups) != 0 {
			groups := make([]*configpb.TestGroup, 0, len(opts.AllowedGroups))
			for _, group := range opts.AllowedGroups {
				c, ok := cfg.Groups[group]
				if !ok {
					log.Errorf("Could not find requested group %q in config", c)
					continue
				}
				groups = append(groups, c)
			}

			q.Init(log, groups, time.Now())
			return

		}

		groups := make([]*configpb.TestGroup, 0, len(cfg.Groups))
		for _, group := range cfg.Groups {
			groups = append(groups, group)
		}

		q.Init(log, groups, time.Now())
	}

	fixSnapshot(<-cfgChanged)

	go func(ctx context.Context) {
		fixCtx, fixCancel := context.WithCancel(ctx)
		var fixWg sync.WaitGroup
		fixAll := func() {
			n := len(fixers)
			log.WithField("fixers", n).Debug("Starting fixers on current groups...")
			fixWg.Add(n)
			for i, fix := range fixers {
				go func(i int, fix Fixer) {
					defer fixWg.Done()
					if err := fix(fixCtx, &q); err != nil && !errors.Is(err, context.Canceled) {
						log.WithError(err).WithField("fixer", i).Warning("Fixer failed")
					}
				}(i, fix)
			}
			log.WithField("fixers", n).Info("Started fixers on current groups.")
		}

		ticker := time.NewTicker(time.Minute)
		fixAll()
		defer ticker.Stop()
		for {
			depth, next, when := q.Status()
			log := log.WithField("depth", depth)
			if next != nil {
				log = log.WithField("next", &next)
			}
			delay := time.Since(when)
			if delay < 0 {
				delay = 0
				log = log.WithField("sleep", -delay)
			}
			mets.DelaySeconds.Set(delay, componentName)
			log.Debug("Calculated metrics")

			select {
			case <-ctx.Done():
				ticker.Stop()
				fixCancel()
				fixWg.Wait()
				return
			case newConfig, ok := <-cfgChanged:
				if !ok {
					log.Info("Configuration channel closed")
					cfgChanged = nil
					continue
				}
				log.Info("Configuration changed")
				fixCancel()
				fixWg.Wait()
				fixCtx, fixCancel = context.WithCancel(ctx)
				fixSnapshot(newConfig)
				fixAll()
			case <-ticker.C:
			}
		}
	}(ctx)

	// Set up worker pools
	groups := make(chan *configpb.TestGroup)
	tasks := make(chan writeTask)
	var tabLock sync.Mutex

	read := func(ctx context.Context, log *logrus.Entry, group *configpb.TestGroup) error {
		if group == nil {
			return errors.New("nil group to read")
		}

		fromPath, err := updater.TestGroupPath(opts.ConfigPath, opts.GridPathPrefix, group.Name)
		if err != nil {
			return fmt.Errorf("can't make tg path %q: %w", group.Name, err)
		}

		log.WithField("from", fromPath.String()).Info("Reading state")

		grid, _, err := gcs.DownloadGrid(ctx, client, *fromPath)
		if err != nil {
			return fmt.Errorf("downloadGrid(%s): %w", fromPath, err)
		}

		tabLock.Lock()
		defer tabLock.Unlock()
		// lock out all other readers so that all these tabs get handled as soon as possible
		for _, task := range tasksPerGroup[group.Name] {
			log := log.WithFields(logrus.Fields{
				"group":     task.group.GetName(),
				"dashboard": task.dashboard.GetName(),
				"tab":       task.tab.GetName(),
			})
			select {
			case <-ctx.Done():
				log.Debug("Skipping irrelevant task")
				continue
			default:
				out := task
				out.data = proto.Clone(grid).(*statepb.Grid)
				log.Debug("Requesting write task")
				tasks <- out
			}
		}
		return nil
	}

	// Run threads continuously
	var readWg, writeWg sync.WaitGroup
	readWg.Add(opts.ReadConcurrency)
	for i := 0; i < opts.ReadConcurrency; i++ {
		go func() {
			defer readWg.Done()
			for group := range groups {
				readCtx, cancel := context.WithCancel(ctx)
				log = log.WithField("group", group.Name)
				err := read(readCtx, log, group)
				cancel()
				if err != nil {
					next := time.Now().Add(opts.Freq / 10)
					q.Fix(group.Name, next, false)
					log.WithError(err).WithField("retry-at", next).Error("failed to read, retry later")
				}
			}
		}()
	}
	writeWg.Add(opts.WriteConcurrency)
	for i := 0; i < opts.WriteConcurrency; i++ {
		go func() {
			defer writeWg.Done()
			for task := range tasks {
				writeCtx, cancel := context.WithTimeout(ctx, writeTimeout)
				finish := mets.UpdateState.Start()
				log = log.WithField("dashboard", task.dashboard.Name).WithField("tab", task.tab.Name)
				err := createTabState(writeCtx, log, client, task, opts.ConfigPath, opts.TabsPathPrefix, opts.Confirm, opts.CalculateStats, opts.UseTabAlertSettings, opts.ExtendState)
				cancel()
				if err != nil {
					finish.Fail()
					log.Errorf("write: %v", err)
					continue
				}
				finish.Success()
			}
		}()
	}

	defer writeWg.Wait()
	defer close(tasks)
	defer readWg.Wait()
	defer close(groups)

	return q.Send(ctx, groups, opts.Freq)
}