pkg/summarizer/summary.go (1,018 lines of code) (raw):
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package summarizer provides a method to read state protos defined in a config an output summary protos.
package summarizer
import (
"compress/zlib"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"path"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"bitbucket.org/creachadair/stringset"
"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/testgrid/config"
"github.com/GoogleCloudPlatform/testgrid/config/snapshot"
"github.com/GoogleCloudPlatform/testgrid/internal/result"
configpb "github.com/GoogleCloudPlatform/testgrid/pb/config"
statepb "github.com/GoogleCloudPlatform/testgrid/pb/state"
summarypb "github.com/GoogleCloudPlatform/testgrid/pb/summary"
statuspb "github.com/GoogleCloudPlatform/testgrid/pb/test_status"
"github.com/GoogleCloudPlatform/testgrid/pkg/tabulator"
"github.com/GoogleCloudPlatform/testgrid/util"
"github.com/GoogleCloudPlatform/testgrid/util/gcs"
"github.com/GoogleCloudPlatform/testgrid/util/metrics"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
)
// Metrics holds metrics relevant to the Updater.
type Metrics struct {
Summarize metrics.Cyclic
}
// CreateMetrics creates all the metrics that the Summarizer will use
// This should be called once
func CreateMetrics(factory metrics.Factory) *Metrics {
return &Metrics{
Summarize: factory.NewCyclic("summarizer"),
}
}
// FeatureFlags aggregates the knobs to enable/disable certain features.
type FeatureFlags struct {
// controls the acceptable flakiness calculation logic for dashboard tab
AllowFuzzyFlakiness bool
// allows ignoring columns with specific test statuses during summarization
AllowIgnoredColumns bool
// allows enforcing minimum number of runs for a dashboard tab
AllowMinNumberOfRuns bool
}
// gridReader returns the grid content and metadata (last updated time, generation id)
type gridReader func(ctx context.Context) (io.ReadCloser, time.Time, int64, error)
// groupFinder returns the named group as well as reader for the grid state
type groupFinder func(dashboardName string, tab *configpb.DashboardTab) (*gcs.Path, *configpb.TestGroup, gridReader, error)
func lockDashboard(ctx context.Context, client gcs.ConditionalClient, path gcs.Path, generation int64) (*storage.ObjectAttrs, error) {
var buf []byte
if generation == 0 {
var sum summarypb.DashboardSummary
var err error
buf, err = proto.Marshal(&sum)
if err != nil {
return nil, fmt.Errorf("marshal: %w", err)
}
}
return gcs.Touch(ctx, client, path, generation, buf)
}
// Fixer should adjust the dashboard queue until the context expires.
type Fixer func(context.Context, *config.DashboardQueue) error
// UpdateOptions aggregates the Update function parameter into a single structure.
type UpdateOptions struct {
ConfigPath gcs.Path
Concurrency int
TabPathPrefix string
SummaryPathPrefix string
AllowedDashboards []string
Confirm bool
Features FeatureFlags
Freq time.Duration
}
// Update summary protos by reading the state protos defined in the config.
//
// Will use concurrency go routines to update dashboards in parallel.
// Setting dashboard will limit update to this dashboard.
// Will write summary proto when confirm is set.
func Update(ctx context.Context, client gcs.ConditionalClient, mets *Metrics, opts *UpdateOptions, fixers ...Fixer) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if opts.Concurrency < 1 {
return fmt.Errorf("concurrency must be positive, got: %d", opts.Concurrency)
}
log := logrus.WithField("config", opts.ConfigPath)
var q config.DashboardQueue
var cfg *snapshot.Config
allowed := stringset.New(opts.AllowedDashboards...)
fixSnapshot := func(newConfig *snapshot.Config) error {
baseLog := log
log := log.WithField("fixSnapshot()", true)
newConfig.Dashboards = filterDashboards(newConfig.Dashboards, allowed)
cfg = newConfig
dashCap := len(cfg.Dashboards)
paths := make([]gcs.Path, 0, dashCap)
dashboards := make([]*configpb.Dashboard, 0, dashCap)
for _, d := range cfg.Dashboards {
path, err := SummaryPath(opts.ConfigPath, opts.SummaryPathPrefix, d.Name)
if err != nil {
log.WithError(err).WithField("dashboard", d.Name).Error("Bad dashboard path")
}
paths = append(paths, *path)
dashboards = append(dashboards, d)
}
stats := gcs.Stat(ctx, client, 10, paths...)
whens := make(map[string]time.Time, len(stats))
var wg sync.WaitGroup
for i, stat := range stats {
name := dashboards[i].Name
path := paths[i]
log := log.WithField("path", path)
switch {
case stat.Attrs != nil:
whens[name] = stat.Attrs.Updated.Add(opts.Freq)
default:
if errors.Is(stat.Err, storage.ErrObjectNotExist) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := lockDashboard(ctx, client, path, 0)
switch {
case gcs.IsPreconditionFailed(err):
log.WithError(err).Debug("Lost race to create initial summary")
case err != nil:
log.WithError(err).Error("Failed to lock initial summary")
default:
log.Info("Created initial summary")
}
}()
} else {
log.WithError(stat.Err).Info("Failed to stat")
}
whens[name] = time.Now()
}
}
wg.Wait()
q.Init(baseLog, dashboards, time.Now().Add(opts.Freq))
if err := q.FixAll(whens, false); err != nil {
log.WithError(err).Error("Failed to fix all dashboards based on last update time")
}
return nil
}
log.Debug("Observing config...")
cfgChanged, err := snapshot.Observe(ctx, log, client, opts.ConfigPath, time.NewTicker(time.Minute).C)
if err != nil {
return fmt.Errorf("observe config: %w", err)
}
fixSnapshot(<-cfgChanged) // Bootstrap queue before use
var active stringset.Set
var waiting stringset.Set
var lock sync.Mutex
go func() {
fixCtx, fixCancel := context.WithCancel(ctx)
var fixWg sync.WaitGroup
fixAll := func() {
n := len(fixers)
log.WithField("fixers", n).Trace("Starting fixers on current dashboards...")
fixWg.Add(n)
for i, fix := range fixers {
go func(i int, fix Fixer) {
defer fixWg.Done()
if err := fix(fixCtx, &q); err != nil && !errors.Is(err, context.Canceled) {
log.WithError(err).WithField("fixer", i).Warning("Fixer failed")
}
}(i, fix)
}
log.Debug("Started fixers on current dashboards")
}
ticker := time.NewTicker(time.Minute) // TODO(fejta): subscribe to notifications
fixAll()
for {
lock.Lock()
activeDashboards := active.Elements()
lock.Unlock()
depth, next, when := q.Status()
log := log.WithFields(logrus.Fields{
"depth": depth,
"active": activeDashboards,
})
if next != nil {
log = log.WithField("next", *next)
}
delay := time.Since(when)
if delay < 0 {
delay = 0
log = log.WithField("sleep", -delay)
}
log = log.WithField("delay", delay.Round(time.Second))
log.Info("Updating dashboards")
select {
case <-ctx.Done():
ticker.Stop()
fixCancel()
fixWg.Wait()
return
case newConfig := <-cfgChanged:
log.Info("Configuration changed")
fixCancel()
fixWg.Wait()
fixCtx, fixCancel = context.WithCancel(ctx)
fixSnapshot(newConfig)
fixAll()
case <-ticker.C:
}
}
}()
dashboardNames := make(chan string)
// TODO(fejta): cache downloaded group?
findGroup := func(dash string, tab *configpb.DashboardTab) (*gcs.Path, *configpb.TestGroup, gridReader, error) {
name := tab.TestGroupName
group := cfg.Groups[name]
if group == nil {
return nil, nil, nil, nil
}
groupPath, err := tabulator.TabStatePath(opts.ConfigPath, opts.TabPathPrefix, dash, tab.Name)
if err != nil {
return nil, group, nil, err
}
reader := func(ctx context.Context) (io.ReadCloser, time.Time, int64, error) {
return pathReader(ctx, client, *groupPath)
}
return groupPath, group, reader, nil
}
tabUpdater := tabUpdatePool(ctx, log, opts.Concurrency, opts.Features)
updateName := func(log *logrus.Entry, dashName string) (logrus.FieldLogger, bool, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
dash := cfg.Dashboards[dashName]
if dash == nil {
return log, false, errors.New("dashboard not found")
}
log.Debug("Summarizing dashboard")
summaryPath, err := SummaryPath(opts.ConfigPath, opts.SummaryPathPrefix, dashName)
if err != nil {
return log, false, fmt.Errorf("summary path: %v", err)
}
sum, _, _, err := ReadSummary(ctx, client, *summaryPath)
if err != nil {
return log, false, fmt.Errorf("read %q: %v", *summaryPath, err)
}
if sum == nil {
sum = &summarypb.DashboardSummary{}
}
// TODO(fejta): refactor to note whether there is more work
more := updateDashboard(ctx, client, dash, sum, findGroup, tabUpdater)
var healthyTests int
var failures int
for _, tab := range sum.TabSummaries {
failures += len(tab.FailingTestSummaries)
if h := tab.Healthiness; h != nil {
healthyTests += len(h.Tests)
}
}
log = log.WithFields(logrus.Fields{
"path": summaryPath,
"tabs": len(sum.TabSummaries),
"failures": failures,
"healthy-tests": healthyTests,
})
if !opts.Confirm {
return log, more, nil
}
size, err := writeSummary(ctx, client, *summaryPath, sum)
log = log.WithField("bytes", size)
if err != nil {
return log, more, fmt.Errorf("write: %w", err)
}
return log, more, nil
}
var wg sync.WaitGroup
wg.Add(opts.Concurrency)
for i := 0; i < opts.Concurrency; i++ {
go func() {
defer wg.Done()
for dashName := range dashboardNames {
lock.Lock()
start := active.Add(dashName)
if !start {
waiting.Add(dashName)
}
lock.Unlock()
if !start {
continue
}
log := log.WithField("dashboard", dashName)
finish := mets.Summarize.Start()
if log, more, err := updateName(log, dashName); err != nil {
finish.Fail()
q.Fix(dashName, time.Now().Add(opts.Freq/2), false)
log.WithError(err).Error("Failed to summarize dashboard")
} else {
finish.Success()
if more {
q.Fix(dashName, time.Now(), false)
log = log.WithField("more", more)
}
log.Info("Summarized dashboard")
}
lock.Lock()
active.Discard(dashName)
restart := waiting.Discard(dashName)
lock.Unlock()
if restart {
q.Fix(dashName, time.Now(), false)
}
}
}()
}
defer wg.Wait()
defer close(dashboardNames)
return q.Send(ctx, dashboardNames, opts.Freq)
}
func filterDashboards(dashboards map[string]*configpb.Dashboard, allowed stringset.Set) map[string]*configpb.Dashboard {
if allowed.Len() == 0 {
return dashboards
}
for key, d := range dashboards {
if allowed.Contains(d.Name) {
continue
}
delete(dashboards, key)
}
return dashboards
}
var (
normalizer = regexp.MustCompile(`[^a-z0-9]+`)
)
// SummaryPath generates a summary GCS path for a given dashboard
func SummaryPath(g gcs.Path, prefix, dashboard string) (*gcs.Path, error) {
// ''.join(c for c in n.lower() if c is alphanumeric
name := "summary-" + normalizer.ReplaceAllString(strings.ToLower(dashboard), "")
fullName := path.Join(prefix, name)
u, err := url.Parse(fullName)
if err != nil {
return nil, fmt.Errorf("parse url: %w", err)
}
np, err := g.ResolveReference(u)
if err != nil {
return nil, fmt.Errorf("resolve reference: %w", err)
}
if np.Bucket() != g.Bucket() {
return nil, fmt.Errorf("dashboard %s should not change bucket", fullName)
}
return np, nil
}
// ReadSummary provides the dashboard summary as defined in summary.proto.
// IMPORTANT: Returns nil if the object doesn't exist.
// Returns an error iff wasn't read or serialized properly.
func ReadSummary(ctx context.Context, client gcs.Client, path gcs.Path) (*summarypb.DashboardSummary, time.Time, int64, error) {
r, modified, gen, err := pathReader(ctx, client, path)
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, time.Time{}, 0, nil
} else if err != nil {
return nil, time.Time{}, 0, fmt.Errorf("open: %w", err)
}
buf, err := ioutil.ReadAll(r)
if err != nil {
return nil, time.Time{}, 0, fmt.Errorf("read: %w", err)
}
var sum summarypb.DashboardSummary
if err := proto.Unmarshal(buf, &sum); err != nil {
return nil, time.Time{}, 0, fmt.Errorf("unmarhsal: %v", err)
}
return &sum, modified, gen, nil
}
func writeSummary(ctx context.Context, client gcs.Client, path gcs.Path, sum *summarypb.DashboardSummary) (int, error) {
buf, err := proto.Marshal(sum)
if err != nil {
return 0, fmt.Errorf("marshal: %v", err)
}
_, err = client.Upload(ctx, path, buf, gcs.DefaultACL, gcs.NoCache)
return len(buf), err
}
func statPaths(ctx context.Context, log logrus.FieldLogger, client gcs.Stater, paths ...gcs.Path) []*storage.ObjectAttrs {
return gcs.StatExisting(ctx, log, client, paths...)
}
// pathReader returns a reader for the specified path and last modified, generation metadata.
func pathReader(ctx context.Context, client gcs.Client, path gcs.Path) (io.ReadCloser, time.Time, int64, error) {
r, attrs, err := client.Open(ctx, path)
if err != nil {
return nil, time.Time{}, 0, fmt.Errorf("client.Open(): %w", err)
}
if attrs == nil {
return r, time.Time{}, 0, nil
}
return r, attrs.LastModified, attrs.Generation, nil
}
func tabStatus(dashName, tabName, msg string) *summarypb.DashboardTabSummary {
return &summarypb.DashboardTabSummary{
DashboardName: dashName,
DashboardTabName: tabName,
OverallStatus: summarypb.DashboardTabSummary_UNKNOWN,
Alert: msg,
Status: msg,
}
}
// updateDashboard will summarize all the tabs.
//
// Errors summarizing tabs are displayed on the summary for the dashboard.
//
// Returns true when there is more work to to.
func updateDashboard(ctx context.Context, client gcs.Stater, dash *configpb.Dashboard, sum *summarypb.DashboardSummary, findGroup groupFinder, tabUpdater *tabUpdater) bool {
log := logrus.WithField("dashboard", dash.Name)
var graceCtx context.Context
if when, ok := ctx.Deadline(); ok {
dur := time.Until(when) / 2
var cancel func()
graceCtx, cancel = context.WithTimeout(ctx, dur)
defer cancel()
} else {
graceCtx = ctx
}
// First collect the previously summarized tabs.
tabSummaries := make(map[string]*summarypb.DashboardTabSummary, len(sum.TabSummaries))
for _, tabSum := range sum.TabSummaries {
tabSummaries[tabSum.DashboardTabName] = tabSum
}
// Now create info about which tabs we need to summarize and where the grid state lives.
type groupInfo struct {
group *configpb.TestGroup
reader gridReader
tabs []*configpb.DashboardTab
}
groupInfos := make(map[gcs.Path]*groupInfo, len(dash.DashboardTab))
var paths []gcs.Path
for _, tab := range dash.DashboardTab {
groupPath, group, groupReader, err := findGroup(dash.Name, tab)
if err != nil {
tabSummaries[tab.Name] = tabStatus(dash.Name, tab.Name, fmt.Sprintf("Error reading group info: %v", err))
continue
}
if group == nil {
tabSummaries[tab.Name] = tabStatus(dash.Name, tab.Name, fmt.Sprintf("Test group does not exist: %q", tab.TestGroupName))
continue
}
info := groupInfos[*groupPath]
if info == nil {
info = &groupInfo{
group: group,
reader: groupReader, // TODO(fejta): optimize (only read once)
}
paths = append(paths, *groupPath)
groupInfos[*groupPath] = info
}
info.tabs = append(info.tabs, tab)
}
// Check the attributes of the grid states.
attrs := gcs.StatExisting(ctx, log, client, paths...)
delays := make(map[gcs.Path]float64, len(paths))
// determine how much behind each summary is
for i, path := range paths {
a := attrs[i]
for _, tab := range groupInfos[path].tabs {
// TODO(fejta): optimize (only read once)
name := tab.Name
sum := tabSummaries[name]
if a == nil {
tabSummaries[name] = tabStatus(dash.Name, name, noRuns)
delays[path] = -1
} else if sum == nil {
tabSummaries[name] = tabStatus(dash.Name, name, "Newly created tab")
delays[path] = float64(24 * time.Hour / time.Second)
log.WithField("tab", name).Debug("Found new tab")
} else {
delays[path] = float64(attrs[i].Updated.Unix()) - tabSummaries[name].LastUpdateTimestamp
}
}
}
// sort by delay
sort.SliceStable(paths, func(i, j int) bool {
return delays[paths[i]] > delays[paths[j]]
})
// Now let's update the tab summaries in parallel, starting with most delayed
type future struct {
log *logrus.Entry
name string
result func() (*summarypb.DashboardTabSummary, error)
}
// channel to receive updated tabs
ch := make(chan future)
// request an update for each tab, starting with the least recently modified one.
go func() {
defer close(ch)
tabUpdater.lock.Lock()
defer tabUpdater.lock.Unlock()
for _, path := range paths {
info := groupInfos[path]
log := log.WithField("group", path)
for _, tab := range info.tabs {
log := log.WithField("tab", tab.Name)
delay := delays[path]
if delay == 0 {
log.Debug("Already up to date")
continue
} else if delay == -1 {
log.Debug("No grid state to process")
}
log = log.WithField("delay", delay)
if err := graceCtx.Err(); err != nil {
log.WithError(err).Info("Interrupted")
return
}
log.Debug("Requesting tab summary update")
f := tabUpdater.update(ctx, tab, info.group, info.reader)
select {
case <-ctx.Done():
return
case ch <- future{log, tab.Name, f}:
}
}
}
}()
// Update the summary for any tabs that give a response
for fut := range ch {
tabName := fut.name
log := fut.log
log.Trace("Waiting for updated tab summary response")
s, err := fut.result()
if err != nil {
s = tabStatus(dash.Name, tabName, fmt.Sprintf("Error attempting to summarize tab: %v", err))
log = log.WithError(err)
} else {
s.DashboardName = dash.Name
}
tabSummaries[tabName] = s
log.Trace("Updated tab summary")
}
// assemble them back into the dashboard summary.
sum.TabSummaries = make([]*summarypb.DashboardTabSummary, len(dash.DashboardTab))
for idx, tab := range dash.DashboardTab {
sum.TabSummaries[idx] = tabSummaries[tab.Name]
}
return graceCtx.Err() != nil
}
type tabUpdater struct {
lock sync.Mutex
update func(context.Context, *configpb.DashboardTab, *configpb.TestGroup, gridReader) func() (*summarypb.DashboardTabSummary, error)
}
func tabUpdatePool(poolCtx context.Context, log *logrus.Entry, concurrency int, features FeatureFlags) *tabUpdater {
type request struct {
ctx context.Context
tab *configpb.DashboardTab
group *configpb.TestGroup
read gridReader
sum *summarypb.DashboardTabSummary
err error
wg sync.WaitGroup
}
ch := make(chan *request, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
log = log.WithField("concurrency", concurrency)
log.Info("Starting up worker pool")
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for req := range ch {
req.sum, req.err = updateTab(req.ctx, req.tab, req.group, req.read, features)
req.wg.Done()
}
}()
}
go func() {
<-poolCtx.Done()
log.Info("Shutting down worker pool")
close(ch)
wg.Wait()
log.Info("Worker pool stopped")
}()
updateTabViaPool := func(ctx context.Context, tab *configpb.DashboardTab, group *configpb.TestGroup, groupReader gridReader) func() (*summarypb.DashboardTabSummary, error) {
req := &request{
ctx: ctx,
tab: tab,
group: group,
read: groupReader,
}
req.wg.Add(1)
select {
case <-ctx.Done():
return func() (*summarypb.DashboardTabSummary, error) { return nil, ctx.Err() }
case ch <- req:
return func() (*summarypb.DashboardTabSummary, error) {
req.wg.Wait()
return req.sum, req.err
}
}
}
return &tabUpdater{
update: updateTabViaPool,
}
}
// staleHours returns the configured number of stale hours for the tab.
func staleHours(tab *configpb.DashboardTab) time.Duration {
if tab.AlertOptions == nil {
return 0
}
return time.Duration(tab.AlertOptions.AlertStaleResultsHours) * time.Hour
}
// updateTab reads the latest grid state for the tab and summarizes it.
func updateTab(ctx context.Context, tab *configpb.DashboardTab, group *configpb.TestGroup, groupReader gridReader, features FeatureFlags) (*summarypb.DashboardTabSummary, error) {
groupName := tab.TestGroupName
grid, mod, _, err := readGrid(ctx, groupReader) // TODO(fejta): track gen
if err != nil {
return nil, fmt.Errorf("load %s: %v", groupName, err)
}
var healthiness *summarypb.HealthinessInfo
if shouldRunHealthiness(tab) {
// TODO (itsazhuhere@): Change to rely on YAML defaults rather than consts
interval := int(tab.HealthAnalysisOptions.DaysOfAnalysis)
if interval <= 0 {
interval = DefaultInterval
}
healthiness = getHealthinessForInterval(grid, tab.Name, time.Now(), interval)
}
recent := recentColumns(tab, group)
grid.Rows = recentRows(grid.Rows, recent)
grid.Rows = filterMethods(grid.Rows)
latest, latestSeconds := latestRun(grid.Columns)
alert := staleAlert(mod, latest, staleHours(tab), len(grid.Rows))
failures := failingTestSummaries(grid.Rows, tab.GetOpenTestTemplate(), group.GetGcsPrefix(), group.GetColumnHeader())
colsCells, brokenState := gridMetrics(len(grid.Columns), grid.Rows, recent, tab.BrokenColumnThreshold, features, tab.GetStatusCustomizationOptions())
metrics := tabMetrics(colsCells)
tabStatus := overallStatus(grid, recent, alert, brokenState, failures, features, colsCells, tab.GetStatusCustomizationOptions())
return &summarypb.DashboardTabSummary{
DashboardTabName: tab.Name,
LastUpdateTimestamp: float64(mod.Unix()),
LastRunTimestamp: float64(latestSeconds),
Alert: alert,
FailingTestSummaries: failures,
OverallStatus: tabStatus,
Status: statusMessage(colsCells, tabStatus, tab.GetStatusCustomizationOptions()),
LatestGreen: latestGreen(grid, group.UseKubernetesClient),
BugUrl: tab.GetOpenBugTemplate().GetUrl(),
Healthiness: healthiness,
LinkedIssues: allLinkedIssues(grid.Rows),
SummaryMetrics: metrics,
}, nil
}
// readGrid downloads and deserializes the current test group state.
func readGrid(ctx context.Context, reader gridReader) (*statepb.Grid, time.Time, int64, error) {
var t time.Time
r, mod, gen, err := reader(ctx)
if err != nil {
return nil, t, 0, fmt.Errorf("open: %w", err)
}
defer r.Close()
zlibReader, err := zlib.NewReader(r)
if err != nil {
return nil, t, 0, fmt.Errorf("decompress: %v", err)
}
buf, err := ioutil.ReadAll(zlibReader)
if err != nil {
return nil, t, 0, fmt.Errorf("read: %v", err)
}
var g statepb.Grid
if err = proto.Unmarshal(buf, &g); err != nil {
return nil, t, 0, fmt.Errorf("parse: %v", err)
}
return &g, mod, gen, nil
}
// recentColumns returns the configured number of recent columns to summarize, or 5.
func recentColumns(tab *configpb.DashboardTab, group *configpb.TestGroup) int {
return firstFilled(tab.NumColumnsRecent, group.NumColumnsRecent, 5)
}
// firstFilled returns the first non-empty value, or zero.
func firstFilled(values ...int32) int {
for _, v := range values {
if v != 0 {
return int(v)
}
}
return 0
}
// recentRows returns the subset of rows with at least one recent result
func recentRows(in []*statepb.Row, recent int) []*statepb.Row {
var rows []*statepb.Row
for _, r := range in {
if r.Results == nil {
continue
}
if statuspb.TestStatus(r.Results[0]) == statuspb.TestStatus_NO_RESULT && int(r.Results[1]) >= recent {
continue
}
rows = append(rows, r)
}
return rows
}
// filterMethods returns the subset of rows that do not have test method names
func filterMethods(rows []*statepb.Row) []*statepb.Row {
var filtered []*statepb.Row
for _, r := range rows {
if !isValidTestName(r.Id) || !isValidTestName(r.Name) {
continue
}
filtered = append(filtered, r)
}
return filtered
}
// latestRun returns the Time (and seconds-since-epoch) of the most recent run.
func latestRun(columns []*statepb.Column) (time.Time, int64) {
if len(columns) > 0 {
if start := int64(columns[0].Started); start > 0 {
second := start / 1000
mills := start % 1000
return time.Unix(second, mills*1e6), second
}
}
return time.Time{}, 0
}
const noRuns = "no completed results"
// staleAlert returns an explanatory message if the latest results are stale.
func staleAlert(mod, ran time.Time, stale time.Duration, rows int) string {
if mod.IsZero() {
return "no stored results"
}
if stale == 0 {
return ""
}
if ran.IsZero() || rows == 0 { // Has no columns and/or no rows.
return noRuns
}
now := time.Now()
if dur := now.Sub(mod); dur > stale {
return fmt.Sprintf("data has not changed since %s (%s old)", mod, dur.Truncate(15*time.Minute))
}
if dur := now.Sub(ran); dur > stale {
return fmt.Sprintf("latest column from %s (%s old)", ran, dur.Truncate(15*time.Minute))
}
return ""
}
// failingTestSummaries returns details for every row with an active alert.
func failingTestSummaries(rows []*statepb.Row, template *configpb.LinkTemplate, gcsPrefix string, columnHeader []*configpb.TestGroup_ColumnHeader) []*summarypb.FailingTestSummary {
var failures []*summarypb.FailingTestSummary
for _, row := range rows {
if row.AlertInfo == nil {
continue
}
alert := row.AlertInfo
sum := summarypb.FailingTestSummary{
DisplayName: row.Name,
TestName: row.Id,
FailBuildId: alert.FailBuildId,
LatestFailBuildId: alert.LatestFailBuildId,
FailCount: alert.FailCount,
FailureMessage: alert.FailureMessage,
PassBuildId: alert.PassBuildId,
// TODO(fejta): better build info
BuildLink: alert.BuildLink,
BuildLinkText: alert.BuildLinkText,
BuildUrlText: alert.BuildUrlText,
LinkedBugs: row.Issues,
FailTestLink: buildFailLink(alert.FailTestId, row.Id),
LatestFailTestLink: buildFailLink(alert.LatestFailTestId, row.Id),
Properties: alert.Properties,
CustomColumnHeaders: alert.CustomColumnHeaders,
HotlistIds: alert.HotlistIds,
EmailAddresses: alert.EmailAddresses,
}
if alert.PassTime != nil {
sum.PassTimestamp = float64(alert.PassTime.Seconds)
}
if alert.FailTime != nil {
sum.FailTimestamp = float64(alert.FailTime.Seconds)
}
propertyToColumnHeader := make(map[string]string)
for i := 0; i < len(columnHeader); i++ {
if columnHeader[i].Label != "" {
propertyToColumnHeader["<custom-"+strconv.Itoa(i)+">"] = columnHeader[i].Label
} else if columnHeader[i].Property != "" {
propertyToColumnHeader["<custom-"+strconv.Itoa(i)+">"] = columnHeader[i].Property
} else {
propertyToColumnHeader["<custom-"+strconv.Itoa(i)+">"] = columnHeader[i].ConfigurationValue
}
}
// Verify what the links for alerts would be with the new method.
failLink := testResultLink(template, alert.GetProperties(), alert.GetFailTestId(), row.GetId(), alert.GetFailBuildId(), gcsPrefix, propertyToColumnHeader, alert.CustomColumnHeaders)
latestFailLink := testResultLink(template, alert.GetProperties(), alert.GetLatestFailTestId(), row.GetId(), alert.GetLatestFailBuildId(), gcsPrefix, propertyToColumnHeader, alert.CustomColumnHeaders)
log := logrus.WithField("failLink", failLink).WithField("latestFailLink", latestFailLink)
if failLink == "" || latestFailLink == "" {
log.Warning("Failed to create failure link.")
} else if !strings.HasPrefix(failLink, "http") || !strings.HasPrefix(latestFailLink, "http") {
log.Warning("Failure link does not include scheme.")
} else {
log.Info("Created failure links.")
}
failures = append(failures, &sum)
}
return failures
}
// buildFailLink creates a search link
// TODO(#134): Build proper url for both internal and external jobs
func buildFailLink(testID, target string) string {
return fmt.Sprintf("%s %s", url.PathEscape(testID), url.PathEscape(target))
}
func testResultLink(template *configpb.LinkTemplate, properties map[string]string, testID, target, buildID, gcsPrefix string, propertyToColumnHeader map[string]string, customColumnHeaders map[string]string) string {
// Return the result of open_test_template for the tab.
// This assumes that open_test_template uses a limited set of tokens (since it's not in the context of a browser).
// Assume that the following are valid: <gcs_prefix>, <test-name>, <workflow-id>, <workflow-name>, <test-id>, <build ID>
// TODO: Ensure workflow-id, workflow-name are added in alerts.
tokens := util.Tokens(template)
parameters := map[string]string{}
for _, token := range tokens {
switch token {
case util.GcsPrefix:
parameters[util.GcsPrefix] = gcsPrefix
case util.TestName:
parameters[util.TestName] = target
case util.WorkflowID:
if workflowID, ok := properties["workflow-id"]; ok {
parameters[util.WorkflowID] = workflowID
}
case util.WorkflowName:
if WorkflowName, ok := properties["workflow-name"]; ok {
parameters[util.WorkflowName] = WorkflowName
}
case util.TestID:
parameters[util.TestID] = testID
case util.BuildID:
parameters[util.BuildID] = buildID
case util.CustomColumnRe.FindString(token):
if v, ok := customColumnHeaders[propertyToColumnHeader[token]]; ok {
parameters[token] = v
}
default:
// Didn't match any simple tokens, check if it's a property.
trimmedToken := strings.NewReplacer("<", "", ">", "").Replace(token)
if v, ok := properties[trimmedToken]; ok {
parameters[token] = v
}
}
}
link, err := util.ExpandTemplate(template, parameters)
if err != nil {
logrus.WithError(err).WithField("template", template).WithField("parameters", parameters).Error("Error expanding link template.")
return ""
}
return link
}
// overallStatus determines whether the tab is stale, failing, flaky or healthy.
//
// Tabs are:
// BROKEN - called with brokenState (typically when most rows are red)
// STALE - called with a stale mstring (typically when most recent column is old)
// FAIL - there is at least one alert
// ACCEPTABLE - the ratio of (valid) failing to total columns is less than configured threshold
// FLAKY - at least one recent column has failing cells
// PENDING - number of valid columns is less than minimum # of runs required
// PASS - all recent columns are entirely green
func overallStatus(grid *statepb.Grid, recent int, stale string, brokenState bool, alerts []*summarypb.FailingTestSummary, features FeatureFlags, colCells gridStats, opts *configpb.DashboardTabStatusCustomizationOptions) summarypb.DashboardTabSummary_TabStatus {
if brokenState {
return summarypb.DashboardTabSummary_BROKEN
}
if stale != "" {
return summarypb.DashboardTabSummary_STALE
}
if len(alerts) > 0 {
return summarypb.DashboardTabSummary_FAIL
}
// safeguard PENDING status behind a flag
if features.AllowMinNumberOfRuns && opts.GetMinAcceptableRuns() > int32(colCells.completedCols-colCells.ignoredCols) {
return summarypb.DashboardTabSummary_PENDING
}
results := result.Map(grid.Rows)
moreCols := true
var passing bool
var flaky bool
// We want to look at recent columns, skipping over any that are still running.
for moreCols && recent > 0 {
moreCols = false
var foundCol bool
var running bool
var ignored bool
// One result off each column since we don't know which
// cells are running ahead of time.
for _, resultF := range results {
r, ok := resultF()
if !ok {
continue
}
moreCols = true
if r == statuspb.TestStatus_RUNNING {
running = true
// not break because we need to pull this column's
// result off every row's channel.
continue
}
if features.AllowIgnoredColumns && result.Ignored(r, opts) {
ignored = true
continue
}
r = coalesceResult(r, result.IgnoreRunning)
if r == statuspb.TestStatus_NO_RESULT {
continue
}
// any failure in a recent column results in flaky
if r != statuspb.TestStatus_PASS {
flaky = true
continue
}
foundCol = true
}
// Running columns are unfinished and therefore should
// not count as "recent" until they finish.
if running {
continue
}
// Ignored columns are ignored from tab status but they do count as recent
// Failures in this col are ignored too
if ignored {
recent--
flaky = false
continue
}
if flaky {
if isAcceptable(colCells, opts, features) {
return summarypb.DashboardTabSummary_ACCEPTABLE
}
return summarypb.DashboardTabSummary_FLAKY
}
if foundCol {
passing = true
recent--
}
}
if passing {
return summarypb.DashboardTabSummary_PASS
}
return summarypb.DashboardTabSummary_UNKNOWN
}
// isAcceptable determines if the flakiness is within acceptable range.
// Return true iff the feature is enabled, `max_acceptable_flakiness` is set and flakiness is < than configured.
func isAcceptable(colCells gridStats, opts *configpb.DashboardTabStatusCustomizationOptions, features FeatureFlags) bool {
if features.AllowFuzzyFlakiness && opts.GetMaxAcceptableFlakiness() > 0 &&
100*float64(colCells.passingCols)/float64(colCells.completedCols-colCells.ignoredCols) >= float64(100-opts.GetMaxAcceptableFlakiness()) {
return true
}
return false
}
func allLinkedIssues(rows []*statepb.Row) []string {
issueSet := make(map[string]bool)
for _, row := range rows {
for _, issueID := range row.Issues {
issueSet[issueID] = true
}
}
linkedIssues := []string{}
for issueID := range issueSet {
linkedIssues = append(linkedIssues, issueID)
}
return linkedIssues
}
// gridStats aggregates columnar and cellular metrics as a struct
type gridStats struct {
passingCols int
completedCols int
ignoredCols int
passingCells int
filledCells int
}
// Culminate set of metrics related to a section of the Grid
func gridMetrics(cols int, rows []*statepb.Row, recent int, brokenThreshold float32, features FeatureFlags, opts *configpb.DashboardTabStatusCustomizationOptions) (gridStats, bool) {
results := result.Map(rows)
var passingCells int
var filledCells int
var passingCols int
var completedCols int
var ignoredCols int
var brokenState bool
for idx := 0; idx < cols; idx++ {
if idx >= recent {
break
}
var passes int
var failures int
var ignores int
var other int
for _, iter := range results {
// TODO(fejta): fail old running cols
r, _ := iter()
// check for ignores first
if features.AllowIgnoredColumns && result.Ignored(r, opts) {
ignores++
}
// proceed with the rest of calculations
status := coalesceResult(r, result.IgnoreRunning)
if result.Passing(status) {
passes++
passingCells++
filledCells++
} else if result.Failing(status) {
failures++
filledCells++
} else if status != statuspb.TestStatus_NO_RESULT {
other++
filledCells++
}
}
if passes+failures+other > 0 {
completedCols++
}
// only one of those can be true
if ignores > 0 {
ignoredCols++
} else if failures == 0 && passes > 0 {
passingCols++
}
if passes+failures > 0 && brokenThreshold > 0 {
if float32(failures)/float32(passes+failures+other) > brokenThreshold {
brokenState = true
}
}
}
metrics := gridStats{
passingCols: passingCols,
completedCols: completedCols,
ignoredCols: ignoredCols,
passingCells: passingCells,
filledCells: filledCells,
}
return metrics, brokenState
}
// Add a subset of colCellMetrics to summary proto
func tabMetrics(colCells gridStats) *summarypb.DashboardTabSummaryMetrics {
return &summarypb.DashboardTabSummaryMetrics{
PassingColumns: int32(colCells.passingCols),
CompletedColumns: int32(colCells.completedCols),
IgnoredColumns: int32(colCells.ignoredCols),
}
}
func fmtStatus(colCells gridStats, tabStatus summarypb.DashboardTabSummary_TabStatus, opts *configpb.DashboardTabStatusCustomizationOptions) string {
colCent := 100 * float64(colCells.passingCols) / float64(colCells.completedCols)
cellCent := 100 * float64(colCells.passingCells) / float64(colCells.filledCells)
flakyCent := 100 * float64(colCells.completedCols-colCells.ignoredCols-colCells.passingCols) / float64(colCells.completedCols-colCells.ignoredCols)
// put tab stats on a single line and additional status info on the next
statusMsg := fmt.Sprintf("Tab stats: %d of %d (%.1f%%) recent columns passed (%d of %d or %.1f%% cells)", colCells.passingCols, colCells.completedCols, colCent, colCells.passingCells, colCells.filledCells, cellCent)
if colCells.ignoredCols > 0 {
statusMsg += fmt.Sprintf(". %d columns ignored", colCells.ignoredCols)
}
// add status info message for certain cases
if tabStatus == summarypb.DashboardTabSummary_PENDING {
statusMsg += "\nStatus info: Not enough runs"
} else if tabStatus == summarypb.DashboardTabSummary_ACCEPTABLE {
statusMsg += fmt.Sprintf("\nStatus info: Recent flakiness (%.1f%%) over valid columns is within configured acceptable level of %.1f%%.", flakyCent, opts.GetMaxAcceptableFlakiness())
}
return statusMsg
}
// Tab stats: 3 out of 5 (60.0%) recent columns passed (35 of 50 or 70.0% cells). 1 columns ignored.
// (OPTIONAL) Status info: Recent flakiness (40.0%) flakiness is within configured acceptable level of X
// OR Status info: Not enough runs
func statusMessage(colCells gridStats, tabStatus summarypb.DashboardTabSummary_TabStatus, opts *configpb.DashboardTabStatusCustomizationOptions) string {
if colCells.filledCells == 0 {
return noRuns
}
return fmtStatus(colCells, tabStatus, opts)
}
const noGreens = "no recent greens"
// latestGreen finds the ID for the most recent column with all passing rows.
//
// Returns the build, first extra column header and/or a no recent greens message.
func latestGreen(grid *statepb.Grid, useFirstExtra bool) string {
results := result.Map(grid.Rows)
for _, col := range grid.Columns {
var failures bool
var passes bool
for _, resultF := range results {
r, _ := resultF()
result := coalesceResult(r, result.ShowRunning)
if result == statuspb.TestStatus_PASS {
passes = true
}
if result == statuspb.TestStatus_FLAKY || result == statuspb.TestStatus_FAIL || result == statuspb.TestStatus_UNKNOWN {
failures = true
}
}
if failures || !passes {
continue
}
if useFirstExtra && len(col.Extra) > 0 {
return col.Extra[0]
}
return col.Build
}
return noGreens
}
func getHealthinessForInterval(grid *statepb.Grid, tabName string, currentTime time.Time, interval int) *summarypb.HealthinessInfo {
now := goBackDays(0, currentTime)
oneInterval := goBackDays(interval, currentTime)
twoIntervals := goBackDays(2*interval, currentTime)
healthiness := CalculateHealthiness(grid, oneInterval, now, tabName)
pastHealthiness := CalculateHealthiness(grid, twoIntervals, oneInterval, tabName)
CalculateTrend(healthiness, pastHealthiness)
healthiness.PreviousFlakiness = []float32{pastHealthiness.AverageFlakiness}
return healthiness
}
func goBackDays(days int, currentTime time.Time) int {
// goBackDays gets the time intervals for our flakiness report.
// The old version of this function would round to the 12am of the given day.
// Since the new flakiness report will be run with Summarizer and therefore more often
// than the once-a-week of the old flakiness report, we will not round to 12am anymore.
date := currentTime.AddDate(0, 0, -1*days)
intDate := int(date.Unix())
return intDate
}
func shouldRunHealthiness(tab *configpb.DashboardTab) bool {
if tab.HealthAnalysisOptions == nil {
return false
}
return tab.HealthAnalysisOptions.Enable
}
// coalesceResult reduces the result to PASS, NO_RESULT, FAIL or FLAKY.
func coalesceResult(rowResult statuspb.TestStatus, ignoreRunning bool) statuspb.TestStatus {
return result.Coalesce(rowResult, ignoreRunning)
}