in pkg/summarizer/summary.go [118:376]
func Update(ctx context.Context, client gcs.ConditionalClient, mets *Metrics, opts *UpdateOptions, fixers ...Fixer) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if opts.Concurrency < 1 {
return fmt.Errorf("concurrency must be positive, got: %d", opts.Concurrency)
}
log := logrus.WithField("config", opts.ConfigPath)
var q config.DashboardQueue
var cfg *snapshot.Config
allowed := stringset.New(opts.AllowedDashboards...)
fixSnapshot := func(newConfig *snapshot.Config) error {
baseLog := log
log := log.WithField("fixSnapshot()", true)
newConfig.Dashboards = filterDashboards(newConfig.Dashboards, allowed)
cfg = newConfig
dashCap := len(cfg.Dashboards)
paths := make([]gcs.Path, 0, dashCap)
dashboards := make([]*configpb.Dashboard, 0, dashCap)
for _, d := range cfg.Dashboards {
path, err := SummaryPath(opts.ConfigPath, opts.SummaryPathPrefix, d.Name)
if err != nil {
log.WithError(err).WithField("dashboard", d.Name).Error("Bad dashboard path")
}
paths = append(paths, *path)
dashboards = append(dashboards, d)
}
stats := gcs.Stat(ctx, client, 10, paths...)
whens := make(map[string]time.Time, len(stats))
var wg sync.WaitGroup
for i, stat := range stats {
name := dashboards[i].Name
path := paths[i]
log := log.WithField("path", path)
switch {
case stat.Attrs != nil:
whens[name] = stat.Attrs.Updated.Add(opts.Freq)
default:
if errors.Is(stat.Err, storage.ErrObjectNotExist) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := lockDashboard(ctx, client, path, 0)
switch {
case gcs.IsPreconditionFailed(err):
log.WithError(err).Debug("Lost race to create initial summary")
case err != nil:
log.WithError(err).Error("Failed to lock initial summary")
default:
log.Info("Created initial summary")
}
}()
} else {
log.WithError(stat.Err).Info("Failed to stat")
}
whens[name] = time.Now()
}
}
wg.Wait()
q.Init(baseLog, dashboards, time.Now().Add(opts.Freq))
if err := q.FixAll(whens, false); err != nil {
log.WithError(err).Error("Failed to fix all dashboards based on last update time")
}
return nil
}
log.Debug("Observing config...")
cfgChanged, err := snapshot.Observe(ctx, log, client, opts.ConfigPath, time.NewTicker(time.Minute).C)
if err != nil {
return fmt.Errorf("observe config: %w", err)
}
fixSnapshot(<-cfgChanged) // Bootstrap queue before use
var active stringset.Set
var waiting stringset.Set
var lock sync.Mutex
go func() {
fixCtx, fixCancel := context.WithCancel(ctx)
var fixWg sync.WaitGroup
fixAll := func() {
n := len(fixers)
log.WithField("fixers", n).Trace("Starting fixers on current dashboards...")
fixWg.Add(n)
for i, fix := range fixers {
go func(i int, fix Fixer) {
defer fixWg.Done()
if err := fix(fixCtx, &q); err != nil && !errors.Is(err, context.Canceled) {
log.WithError(err).WithField("fixer", i).Warning("Fixer failed")
}
}(i, fix)
}
log.Debug("Started fixers on current dashboards")
}
ticker := time.NewTicker(time.Minute) // TODO(fejta): subscribe to notifications
fixAll()
for {
lock.Lock()
activeDashboards := active.Elements()
lock.Unlock()
depth, next, when := q.Status()
log := log.WithFields(logrus.Fields{
"depth": depth,
"active": activeDashboards,
})
if next != nil {
log = log.WithField("next", *next)
}
delay := time.Since(when)
if delay < 0 {
delay = 0
log = log.WithField("sleep", -delay)
}
log = log.WithField("delay", delay.Round(time.Second))
log.Info("Updating dashboards")
select {
case <-ctx.Done():
ticker.Stop()
fixCancel()
fixWg.Wait()
return
case newConfig := <-cfgChanged:
log.Info("Configuration changed")
fixCancel()
fixWg.Wait()
fixCtx, fixCancel = context.WithCancel(ctx)
fixSnapshot(newConfig)
fixAll()
case <-ticker.C:
}
}
}()
dashboardNames := make(chan string)
// TODO(fejta): cache downloaded group?
findGroup := func(dash string, tab *configpb.DashboardTab) (*gcs.Path, *configpb.TestGroup, gridReader, error) {
name := tab.TestGroupName
group := cfg.Groups[name]
if group == nil {
return nil, nil, nil, nil
}
groupPath, err := tabulator.TabStatePath(opts.ConfigPath, opts.TabPathPrefix, dash, tab.Name)
if err != nil {
return nil, group, nil, err
}
reader := func(ctx context.Context) (io.ReadCloser, time.Time, int64, error) {
return pathReader(ctx, client, *groupPath)
}
return groupPath, group, reader, nil
}
tabUpdater := tabUpdatePool(ctx, log, opts.Concurrency, opts.Features)
updateName := func(log *logrus.Entry, dashName string) (logrus.FieldLogger, bool, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
dash := cfg.Dashboards[dashName]
if dash == nil {
return log, false, errors.New("dashboard not found")
}
log.Debug("Summarizing dashboard")
summaryPath, err := SummaryPath(opts.ConfigPath, opts.SummaryPathPrefix, dashName)
if err != nil {
return log, false, fmt.Errorf("summary path: %v", err)
}
sum, _, _, err := ReadSummary(ctx, client, *summaryPath)
if err != nil {
return log, false, fmt.Errorf("read %q: %v", *summaryPath, err)
}
if sum == nil {
sum = &summarypb.DashboardSummary{}
}
// TODO(fejta): refactor to note whether there is more work
more := updateDashboard(ctx, client, dash, sum, findGroup, tabUpdater)
var healthyTests int
var failures int
for _, tab := range sum.TabSummaries {
failures += len(tab.FailingTestSummaries)
if h := tab.Healthiness; h != nil {
healthyTests += len(h.Tests)
}
}
log = log.WithFields(logrus.Fields{
"path": summaryPath,
"tabs": len(sum.TabSummaries),
"failures": failures,
"healthy-tests": healthyTests,
})
if !opts.Confirm {
return log, more, nil
}
size, err := writeSummary(ctx, client, *summaryPath, sum)
log = log.WithField("bytes", size)
if err != nil {
return log, more, fmt.Errorf("write: %w", err)
}
return log, more, nil
}
var wg sync.WaitGroup
wg.Add(opts.Concurrency)
for i := 0; i < opts.Concurrency; i++ {
go func() {
defer wg.Done()
for dashName := range dashboardNames {
lock.Lock()
start := active.Add(dashName)
if !start {
waiting.Add(dashName)
}
lock.Unlock()
if !start {
continue
}
log := log.WithField("dashboard", dashName)
finish := mets.Summarize.Start()
if log, more, err := updateName(log, dashName); err != nil {
finish.Fail()
q.Fix(dashName, time.Now().Add(opts.Freq/2), false)
log.WithError(err).Error("Failed to summarize dashboard")
} else {
finish.Success()
if more {
q.Fix(dashName, time.Now(), false)
log = log.WithField("more", more)
}
log.Info("Summarized dashboard")
}
lock.Lock()
active.Discard(dashName)
restart := waiting.Discard(dashName)
lock.Unlock()
if restart {
q.Fix(dashName, time.Now(), false)
}
}
}()
}
defer wg.Wait()
defer close(dashboardNames)
return q.Send(ctx, dashboardNames, opts.Freq)
}