func Update()

in pkg/summarizer/summary.go [118:376]


func Update(ctx context.Context, client gcs.ConditionalClient, mets *Metrics, opts *UpdateOptions, fixers ...Fixer) error {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	if opts.Concurrency < 1 {
		return fmt.Errorf("concurrency must be positive, got: %d", opts.Concurrency)
	}
	log := logrus.WithField("config", opts.ConfigPath)

	var q config.DashboardQueue
	var cfg *snapshot.Config

	allowed := stringset.New(opts.AllowedDashboards...)
	fixSnapshot := func(newConfig *snapshot.Config) error {
		baseLog := log
		log := log.WithField("fixSnapshot()", true)
		newConfig.Dashboards = filterDashboards(newConfig.Dashboards, allowed)
		cfg = newConfig

		dashCap := len(cfg.Dashboards)
		paths := make([]gcs.Path, 0, dashCap)
		dashboards := make([]*configpb.Dashboard, 0, dashCap)
		for _, d := range cfg.Dashboards {
			path, err := SummaryPath(opts.ConfigPath, opts.SummaryPathPrefix, d.Name)
			if err != nil {
				log.WithError(err).WithField("dashboard", d.Name).Error("Bad dashboard path")
			}
			paths = append(paths, *path)
			dashboards = append(dashboards, d)
		}

		stats := gcs.Stat(ctx, client, 10, paths...)
		whens := make(map[string]time.Time, len(stats))
		var wg sync.WaitGroup
		for i, stat := range stats {
			name := dashboards[i].Name
			path := paths[i]
			log := log.WithField("path", path)
			switch {
			case stat.Attrs != nil:
				whens[name] = stat.Attrs.Updated.Add(opts.Freq)
			default:
				if errors.Is(stat.Err, storage.ErrObjectNotExist) {
					wg.Add(1)
					go func() {
						defer wg.Done()
						_, err := lockDashboard(ctx, client, path, 0)
						switch {
						case gcs.IsPreconditionFailed(err):
							log.WithError(err).Debug("Lost race to create initial summary")
						case err != nil:
							log.WithError(err).Error("Failed to lock initial summary")
						default:
							log.Info("Created initial summary")
						}
					}()
				} else {
					log.WithError(stat.Err).Info("Failed to stat")
				}
				whens[name] = time.Now()
			}
		}

		wg.Wait()

		q.Init(baseLog, dashboards, time.Now().Add(opts.Freq))
		if err := q.FixAll(whens, false); err != nil {
			log.WithError(err).Error("Failed to fix all dashboards based on last update time")
		}
		return nil
	}

	log.Debug("Observing config...")
	cfgChanged, err := snapshot.Observe(ctx, log, client, opts.ConfigPath, time.NewTicker(time.Minute).C)
	if err != nil {
		return fmt.Errorf("observe config: %w", err)
	}
	fixSnapshot(<-cfgChanged) // Bootstrap queue before use

	var active stringset.Set
	var waiting stringset.Set
	var lock sync.Mutex

	go func() {
		fixCtx, fixCancel := context.WithCancel(ctx)
		var fixWg sync.WaitGroup
		fixAll := func() {
			n := len(fixers)
			log.WithField("fixers", n).Trace("Starting fixers on current dashboards...")
			fixWg.Add(n)
			for i, fix := range fixers {
				go func(i int, fix Fixer) {
					defer fixWg.Done()
					if err := fix(fixCtx, &q); err != nil && !errors.Is(err, context.Canceled) {
						log.WithError(err).WithField("fixer", i).Warning("Fixer failed")
					}
				}(i, fix)
			}
			log.Debug("Started fixers on current dashboards")
		}

		ticker := time.NewTicker(time.Minute) // TODO(fejta): subscribe to notifications
		fixAll()
		for {
			lock.Lock()
			activeDashboards := active.Elements()
			lock.Unlock()

			depth, next, when := q.Status()
			log := log.WithFields(logrus.Fields{
				"depth":  depth,
				"active": activeDashboards,
			})
			if next != nil {
				log = log.WithField("next", *next)
			}
			delay := time.Since(when)
			if delay < 0 {
				delay = 0
				log = log.WithField("sleep", -delay)
			}
			log = log.WithField("delay", delay.Round(time.Second))
			log.Info("Updating dashboards")
			select {
			case <-ctx.Done():
				ticker.Stop()
				fixCancel()
				fixWg.Wait()
				return
			case newConfig := <-cfgChanged:
				log.Info("Configuration changed")
				fixCancel()
				fixWg.Wait()
				fixCtx, fixCancel = context.WithCancel(ctx)
				fixSnapshot(newConfig)
				fixAll()
			case <-ticker.C:
			}

		}
	}()

	dashboardNames := make(chan string)

	// TODO(fejta): cache downloaded group?
	findGroup := func(dash string, tab *configpb.DashboardTab) (*gcs.Path, *configpb.TestGroup, gridReader, error) {
		name := tab.TestGroupName
		group := cfg.Groups[name]
		if group == nil {
			return nil, nil, nil, nil
		}
		groupPath, err := tabulator.TabStatePath(opts.ConfigPath, opts.TabPathPrefix, dash, tab.Name)
		if err != nil {
			return nil, group, nil, err
		}
		reader := func(ctx context.Context) (io.ReadCloser, time.Time, int64, error) {
			return pathReader(ctx, client, *groupPath)
		}
		return groupPath, group, reader, nil
	}

	tabUpdater := tabUpdatePool(ctx, log, opts.Concurrency, opts.Features)

	updateName := func(log *logrus.Entry, dashName string) (logrus.FieldLogger, bool, error) {
		ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
		defer cancel()
		dash := cfg.Dashboards[dashName]
		if dash == nil {
			return log, false, errors.New("dashboard not found")
		}
		log.Debug("Summarizing dashboard")
		summaryPath, err := SummaryPath(opts.ConfigPath, opts.SummaryPathPrefix, dashName)
		if err != nil {
			return log, false, fmt.Errorf("summary path: %v", err)
		}
		sum, _, _, err := ReadSummary(ctx, client, *summaryPath)
		if err != nil {
			return log, false, fmt.Errorf("read %q: %v", *summaryPath, err)
		}

		if sum == nil {
			sum = &summarypb.DashboardSummary{}
		}

		// TODO(fejta): refactor to note whether there is more work
		more := updateDashboard(ctx, client, dash, sum, findGroup, tabUpdater)

		var healthyTests int
		var failures int
		for _, tab := range sum.TabSummaries {
			failures += len(tab.FailingTestSummaries)
			if h := tab.Healthiness; h != nil {
				healthyTests += len(h.Tests)
			}
		}

		log = log.WithFields(logrus.Fields{
			"path":          summaryPath,
			"tabs":          len(sum.TabSummaries),
			"failures":      failures,
			"healthy-tests": healthyTests,
		})
		if !opts.Confirm {
			return log, more, nil
		}
		size, err := writeSummary(ctx, client, *summaryPath, sum)
		log = log.WithField("bytes", size)
		if err != nil {
			return log, more, fmt.Errorf("write: %w", err)
		}
		return log, more, nil
	}

	var wg sync.WaitGroup
	wg.Add(opts.Concurrency)
	for i := 0; i < opts.Concurrency; i++ {
		go func() {
			defer wg.Done()
			for dashName := range dashboardNames {
				lock.Lock()
				start := active.Add(dashName)
				if !start {
					waiting.Add(dashName)
				}
				lock.Unlock()
				if !start {
					continue
				}

				log := log.WithField("dashboard", dashName)
				finish := mets.Summarize.Start()
				if log, more, err := updateName(log, dashName); err != nil {
					finish.Fail()
					q.Fix(dashName, time.Now().Add(opts.Freq/2), false)
					log.WithError(err).Error("Failed to summarize dashboard")
				} else {
					finish.Success()
					if more {
						q.Fix(dashName, time.Now(), false)
						log = log.WithField("more", more)
					}
					log.Info("Summarized dashboard")
				}

				lock.Lock()
				active.Discard(dashName)
				restart := waiting.Discard(dashName)
				lock.Unlock()
				if restart {
					q.Fix(dashName, time.Now(), false)
				}

			}
		}()
	}
	defer wg.Wait()
	defer close(dashboardNames)

	return q.Send(ctx, dashboardNames, opts.Freq)
}