func updateDashboard()

in pkg/summarizer/summary.go [478:625]


func updateDashboard(ctx context.Context, client gcs.Stater, dash *configpb.Dashboard, sum *summarypb.DashboardSummary, findGroup groupFinder, tabUpdater *tabUpdater) bool {
	log := logrus.WithField("dashboard", dash.Name)

	var graceCtx context.Context
	if when, ok := ctx.Deadline(); ok {
		dur := time.Until(when) / 2
		var cancel func()
		graceCtx, cancel = context.WithTimeout(ctx, dur)
		defer cancel()
	} else {
		graceCtx = ctx
	}

	// First collect the previously summarized tabs.
	tabSummaries := make(map[string]*summarypb.DashboardTabSummary, len(sum.TabSummaries))
	for _, tabSum := range sum.TabSummaries {
		tabSummaries[tabSum.DashboardTabName] = tabSum
	}

	// Now create info about which tabs we need to summarize and where the grid state lives.
	type groupInfo struct {
		group  *configpb.TestGroup
		reader gridReader
		tabs   []*configpb.DashboardTab
	}
	groupInfos := make(map[gcs.Path]*groupInfo, len(dash.DashboardTab))

	var paths []gcs.Path
	for _, tab := range dash.DashboardTab {
		groupPath, group, groupReader, err := findGroup(dash.Name, tab)
		if err != nil {
			tabSummaries[tab.Name] = tabStatus(dash.Name, tab.Name, fmt.Sprintf("Error reading group info: %v", err))
			continue
		}
		if group == nil {
			tabSummaries[tab.Name] = tabStatus(dash.Name, tab.Name, fmt.Sprintf("Test group does not exist: %q", tab.TestGroupName))
			continue
		}
		info := groupInfos[*groupPath]
		if info == nil {
			info = &groupInfo{
				group:  group,
				reader: groupReader, // TODO(fejta): optimize (only read once)
			}
			paths = append(paths, *groupPath)
			groupInfos[*groupPath] = info
		}
		info.tabs = append(info.tabs, tab)
	}

	// Check the attributes of the grid states.
	attrs := gcs.StatExisting(ctx, log, client, paths...)

	delays := make(map[gcs.Path]float64, len(paths))

	// determine how much behind each summary is
	for i, path := range paths {
		a := attrs[i]
		for _, tab := range groupInfos[path].tabs {
			// TODO(fejta): optimize (only read once)
			name := tab.Name
			sum := tabSummaries[name]
			if a == nil {
				tabSummaries[name] = tabStatus(dash.Name, name, noRuns)
				delays[path] = -1
			} else if sum == nil {
				tabSummaries[name] = tabStatus(dash.Name, name, "Newly created tab")
				delays[path] = float64(24 * time.Hour / time.Second)
				log.WithField("tab", name).Debug("Found new tab")
			} else {
				delays[path] = float64(attrs[i].Updated.Unix()) - tabSummaries[name].LastUpdateTimestamp
			}
		}
	}

	// sort by delay
	sort.SliceStable(paths, func(i, j int) bool {
		return delays[paths[i]] > delays[paths[j]]
	})

	// Now let's update the tab summaries in parallel, starting with most delayed

	type future struct {
		log    *logrus.Entry
		name   string
		result func() (*summarypb.DashboardTabSummary, error)
	}

	// channel to receive updated tabs
	ch := make(chan future)

	// request an update for each tab, starting with the least recently modified one.
	go func() {
		defer close(ch)
		tabUpdater.lock.Lock()
		defer tabUpdater.lock.Unlock()
		for _, path := range paths {
			info := groupInfos[path]
			log := log.WithField("group", path)
			for _, tab := range info.tabs {
				log := log.WithField("tab", tab.Name)
				delay := delays[path]
				if delay == 0 {
					log.Debug("Already up to date")
					continue
				} else if delay == -1 {
					log.Debug("No grid state to process")
				}
				log = log.WithField("delay", delay)
				if err := graceCtx.Err(); err != nil {
					log.WithError(err).Info("Interrupted")
					return
				}
				log.Debug("Requesting tab summary update")
				f := tabUpdater.update(ctx, tab, info.group, info.reader)
				select {
				case <-ctx.Done():
					return
				case ch <- future{log, tab.Name, f}:
				}
			}
		}
	}()

	// Update the summary for any tabs that give a response
	for fut := range ch {
		tabName := fut.name
		log := fut.log
		log.Trace("Waiting for updated tab summary response")
		s, err := fut.result()
		if err != nil {
			s = tabStatus(dash.Name, tabName, fmt.Sprintf("Error attempting to summarize tab: %v", err))
			log = log.WithError(err)
		} else {
			s.DashboardName = dash.Name
		}
		tabSummaries[tabName] = s
		log.Trace("Updated tab summary")
	}

	// assemble them back into the dashboard summary.
	sum.TabSummaries = make([]*summarypb.DashboardTabSummary, len(dash.DashboardTab))
	for idx, tab := range dash.DashboardTab {
		sum.TabSummaries[idx] = tabSummaries[tab.Name]
	}

	return graceCtx.Err() != nil
}