in pkg/summarizer/summary.go [478:625]
func updateDashboard(ctx context.Context, client gcs.Stater, dash *configpb.Dashboard, sum *summarypb.DashboardSummary, findGroup groupFinder, tabUpdater *tabUpdater) bool {
log := logrus.WithField("dashboard", dash.Name)
var graceCtx context.Context
if when, ok := ctx.Deadline(); ok {
dur := time.Until(when) / 2
var cancel func()
graceCtx, cancel = context.WithTimeout(ctx, dur)
defer cancel()
} else {
graceCtx = ctx
}
// First collect the previously summarized tabs.
tabSummaries := make(map[string]*summarypb.DashboardTabSummary, len(sum.TabSummaries))
for _, tabSum := range sum.TabSummaries {
tabSummaries[tabSum.DashboardTabName] = tabSum
}
// Now create info about which tabs we need to summarize and where the grid state lives.
type groupInfo struct {
group *configpb.TestGroup
reader gridReader
tabs []*configpb.DashboardTab
}
groupInfos := make(map[gcs.Path]*groupInfo, len(dash.DashboardTab))
var paths []gcs.Path
for _, tab := range dash.DashboardTab {
groupPath, group, groupReader, err := findGroup(dash.Name, tab)
if err != nil {
tabSummaries[tab.Name] = tabStatus(dash.Name, tab.Name, fmt.Sprintf("Error reading group info: %v", err))
continue
}
if group == nil {
tabSummaries[tab.Name] = tabStatus(dash.Name, tab.Name, fmt.Sprintf("Test group does not exist: %q", tab.TestGroupName))
continue
}
info := groupInfos[*groupPath]
if info == nil {
info = &groupInfo{
group: group,
reader: groupReader, // TODO(fejta): optimize (only read once)
}
paths = append(paths, *groupPath)
groupInfos[*groupPath] = info
}
info.tabs = append(info.tabs, tab)
}
// Check the attributes of the grid states.
attrs := gcs.StatExisting(ctx, log, client, paths...)
delays := make(map[gcs.Path]float64, len(paths))
// determine how much behind each summary is
for i, path := range paths {
a := attrs[i]
for _, tab := range groupInfos[path].tabs {
// TODO(fejta): optimize (only read once)
name := tab.Name
sum := tabSummaries[name]
if a == nil {
tabSummaries[name] = tabStatus(dash.Name, name, noRuns)
delays[path] = -1
} else if sum == nil {
tabSummaries[name] = tabStatus(dash.Name, name, "Newly created tab")
delays[path] = float64(24 * time.Hour / time.Second)
log.WithField("tab", name).Debug("Found new tab")
} else {
delays[path] = float64(attrs[i].Updated.Unix()) - tabSummaries[name].LastUpdateTimestamp
}
}
}
// sort by delay
sort.SliceStable(paths, func(i, j int) bool {
return delays[paths[i]] > delays[paths[j]]
})
// Now let's update the tab summaries in parallel, starting with most delayed
type future struct {
log *logrus.Entry
name string
result func() (*summarypb.DashboardTabSummary, error)
}
// channel to receive updated tabs
ch := make(chan future)
// request an update for each tab, starting with the least recently modified one.
go func() {
defer close(ch)
tabUpdater.lock.Lock()
defer tabUpdater.lock.Unlock()
for _, path := range paths {
info := groupInfos[path]
log := log.WithField("group", path)
for _, tab := range info.tabs {
log := log.WithField("tab", tab.Name)
delay := delays[path]
if delay == 0 {
log.Debug("Already up to date")
continue
} else if delay == -1 {
log.Debug("No grid state to process")
}
log = log.WithField("delay", delay)
if err := graceCtx.Err(); err != nil {
log.WithError(err).Info("Interrupted")
return
}
log.Debug("Requesting tab summary update")
f := tabUpdater.update(ctx, tab, info.group, info.reader)
select {
case <-ctx.Done():
return
case ch <- future{log, tab.Name, f}:
}
}
}
}()
// Update the summary for any tabs that give a response
for fut := range ch {
tabName := fut.name
log := fut.log
log.Trace("Waiting for updated tab summary response")
s, err := fut.result()
if err != nil {
s = tabStatus(dash.Name, tabName, fmt.Sprintf("Error attempting to summarize tab: %v", err))
log = log.WithError(err)
} else {
s.DashboardName = dash.Name
}
tabSummaries[tabName] = s
log.Trace("Updated tab summary")
}
// assemble them back into the dashboard summary.
sum.TabSummaries = make([]*summarypb.DashboardTabSummary, len(dash.DashboardTab))
for idx, tab := range dash.DashboardTab {
sum.TabSummaries[idx] = tabSummaries[tab.Name]
}
return graceCtx.Err() != nil
}