pkg/tabulator/tabstate.go (362 lines of code) (raw):

/* Copyright 2022 The TestGrid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package tabulator processes test group state into tab state. package tabulator import ( "context" "errors" "fmt" "net/url" "path" "sync" "time" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" "github.com/GoogleCloudPlatform/testgrid/config" "github.com/GoogleCloudPlatform/testgrid/config/snapshot" configpb "github.com/GoogleCloudPlatform/testgrid/pb/config" statepb "github.com/GoogleCloudPlatform/testgrid/pb/state" tspb "github.com/GoogleCloudPlatform/testgrid/pb/test_status" "github.com/GoogleCloudPlatform/testgrid/pkg/updater" "github.com/GoogleCloudPlatform/testgrid/util/gcs" "github.com/GoogleCloudPlatform/testgrid/util/metrics" ) const componentName = "tabulator" const writeTimeout = 10 * time.Minute // Metrics holds metrics relevant to this controller. type Metrics struct { UpdateState metrics.Cyclic DelaySeconds metrics.Duration } // CreateMetrics creates metrics for this controller func CreateMetrics(factory metrics.Factory) *Metrics { return &Metrics{ UpdateState: factory.NewCyclic(componentName), DelaySeconds: factory.NewDuration("delay", "Seconds tabulator is behind schedule", "component"), } } type writeTask struct { dashboard *configpb.Dashboard tab *configpb.DashboardTab group *configpb.TestGroup data *statepb.Grid //TODO(chases2): change to inflatedColumns (and additional data) now that "filter-columns" is used everywhere } func mapTasks(cfg *snapshot.Config) map[string][]writeTask { groupToTabs := make(map[string][]writeTask, len(cfg.Groups)) for _, dashboard := range cfg.Dashboards { for _, tab := range dashboard.DashboardTab { g := tab.TestGroupName groupToTabs[g] = append(groupToTabs[g], writeTask{ dashboard: dashboard, tab: tab, group: cfg.Groups[g], }) } } return groupToTabs } // Fixer should adjust the queue until the context expires. type Fixer func(context.Context, *config.TestGroupQueue) error // UpdateOptions aggregates the Update function parameter into a single structure. type UpdateOptions struct { ConfigPath gcs.Path ReadConcurrency int WriteConcurrency int GridPathPrefix string TabsPathPrefix string AllowedGroups []string Confirm bool CalculateStats bool UseTabAlertSettings bool ExtendState bool Freq time.Duration } // Update tab state with the given frequency continuously. If freq == 0, runs only once. // // Copies the grid into the tab state, removing unneeded data. // Observes each test group in allowedGroups, or all of them in the config if not specified func Update(ctx context.Context, client gcs.ConditionalClient, mets *Metrics, opts *UpdateOptions, fixers ...Fixer) error { ctx, cancel := context.WithCancel(ctx) defer cancel() if opts.ReadConcurrency < 1 || opts.WriteConcurrency < 1 { return fmt.Errorf("concurrency must be positive, got read %d and write %d", opts.ReadConcurrency, opts.WriteConcurrency) } log := logrus.WithField("config", opts.ConfigPath) var q config.TestGroupQueue log.Debug("Observing config...") cfgChanged, err := snapshot.Observe(ctx, log, client, opts.ConfigPath, time.NewTicker(time.Minute).C) if err != nil { return fmt.Errorf("error while observing config %q: %w", opts.ConfigPath.String(), err) } var cfg *snapshot.Config var tasksPerGroup map[string][]writeTask fixSnapshot := func(newConfig *snapshot.Config) { cfg = newConfig tasksPerGroup = mapTasks(cfg) if len(opts.AllowedGroups) != 0 { groups := make([]*configpb.TestGroup, 0, len(opts.AllowedGroups)) for _, group := range opts.AllowedGroups { c, ok := cfg.Groups[group] if !ok { log.Errorf("Could not find requested group %q in config", c) continue } groups = append(groups, c) } q.Init(log, groups, time.Now()) return } groups := make([]*configpb.TestGroup, 0, len(cfg.Groups)) for _, group := range cfg.Groups { groups = append(groups, group) } q.Init(log, groups, time.Now()) } fixSnapshot(<-cfgChanged) go func(ctx context.Context) { fixCtx, fixCancel := context.WithCancel(ctx) var fixWg sync.WaitGroup fixAll := func() { n := len(fixers) log.WithField("fixers", n).Debug("Starting fixers on current groups...") fixWg.Add(n) for i, fix := range fixers { go func(i int, fix Fixer) { defer fixWg.Done() if err := fix(fixCtx, &q); err != nil && !errors.Is(err, context.Canceled) { log.WithError(err).WithField("fixer", i).Warning("Fixer failed") } }(i, fix) } log.WithField("fixers", n).Info("Started fixers on current groups.") } ticker := time.NewTicker(time.Minute) fixAll() defer ticker.Stop() for { depth, next, when := q.Status() log := log.WithField("depth", depth) if next != nil { log = log.WithField("next", &next) } delay := time.Since(when) if delay < 0 { delay = 0 log = log.WithField("sleep", -delay) } mets.DelaySeconds.Set(delay, componentName) log.Debug("Calculated metrics") select { case <-ctx.Done(): ticker.Stop() fixCancel() fixWg.Wait() return case newConfig, ok := <-cfgChanged: if !ok { log.Info("Configuration channel closed") cfgChanged = nil continue } log.Info("Configuration changed") fixCancel() fixWg.Wait() fixCtx, fixCancel = context.WithCancel(ctx) fixSnapshot(newConfig) fixAll() case <-ticker.C: } } }(ctx) // Set up worker pools groups := make(chan *configpb.TestGroup) tasks := make(chan writeTask) var tabLock sync.Mutex read := func(ctx context.Context, log *logrus.Entry, group *configpb.TestGroup) error { if group == nil { return errors.New("nil group to read") } fromPath, err := updater.TestGroupPath(opts.ConfigPath, opts.GridPathPrefix, group.Name) if err != nil { return fmt.Errorf("can't make tg path %q: %w", group.Name, err) } log.WithField("from", fromPath.String()).Info("Reading state") grid, _, err := gcs.DownloadGrid(ctx, client, *fromPath) if err != nil { return fmt.Errorf("downloadGrid(%s): %w", fromPath, err) } tabLock.Lock() defer tabLock.Unlock() // lock out all other readers so that all these tabs get handled as soon as possible for _, task := range tasksPerGroup[group.Name] { log := log.WithFields(logrus.Fields{ "group": task.group.GetName(), "dashboard": task.dashboard.GetName(), "tab": task.tab.GetName(), }) select { case <-ctx.Done(): log.Debug("Skipping irrelevant task") continue default: out := task out.data = proto.Clone(grid).(*statepb.Grid) log.Debug("Requesting write task") tasks <- out } } return nil } // Run threads continuously var readWg, writeWg sync.WaitGroup readWg.Add(opts.ReadConcurrency) for i := 0; i < opts.ReadConcurrency; i++ { go func() { defer readWg.Done() for group := range groups { readCtx, cancel := context.WithCancel(ctx) log = log.WithField("group", group.Name) err := read(readCtx, log, group) cancel() if err != nil { next := time.Now().Add(opts.Freq / 10) q.Fix(group.Name, next, false) log.WithError(err).WithField("retry-at", next).Error("failed to read, retry later") } } }() } writeWg.Add(opts.WriteConcurrency) for i := 0; i < opts.WriteConcurrency; i++ { go func() { defer writeWg.Done() for task := range tasks { writeCtx, cancel := context.WithTimeout(ctx, writeTimeout) finish := mets.UpdateState.Start() log = log.WithField("dashboard", task.dashboard.Name).WithField("tab", task.tab.Name) err := createTabState(writeCtx, log, client, task, opts.ConfigPath, opts.TabsPathPrefix, opts.Confirm, opts.CalculateStats, opts.UseTabAlertSettings, opts.ExtendState) cancel() if err != nil { finish.Fail() log.Errorf("write: %v", err) continue } finish.Success() } }() } defer writeWg.Wait() defer close(tasks) defer readWg.Wait() defer close(groups) return q.Send(ctx, groups, opts.Freq) } // createTabState creates the tab state from the group state func createTabState(ctx context.Context, log logrus.FieldLogger, client gcs.Client, task writeTask, configPath gcs.Path, tabsPathPrefix string, confirm, calculateStats, useTabAlerts, extendState bool) error { location, err := TabStatePath(configPath, tabsPathPrefix, task.dashboard.Name, task.tab.Name) if err != nil { return fmt.Errorf("can't make dashtab path %s/%s: %w", task.dashboard.Name, task.tab.Name, err) } log.WithFields(logrus.Fields{ "to": location.String(), }).Info("Calculating state") var existingGrid *statepb.Grid if extendState { // TODO(chases2): Download grid only if task.Data was truncated (last column is UNKNOWN) existingGrid, _, err = gcs.DownloadGrid(ctx, client, *location) if err != nil { return fmt.Errorf("downloadGrid: %w", err) } } grid, err := tabulate(ctx, log, task.data, task.tab, task.group, calculateStats, useTabAlerts, existingGrid) if err != nil { return fmt.Errorf("tabulate: %w", err) } if !confirm { log.Debug("Successfully created tab state; discarding") return nil } buf, err := gcs.MarshalGrid(grid) if err != nil { return fmt.Errorf("marshalGrid: %w", err) } _, err = client.Upload(ctx, *location, buf, gcs.DefaultACL, gcs.NoCache) if err != nil { return fmt.Errorf("client.Upload(%s): %w", location, err) } return nil } // TabStatePath returns the path for a given tab. func TabStatePath(configPath gcs.Path, tabPrefix, dashboardName, tabName string) (*gcs.Path, error) { name := path.Join(tabPrefix, dashboardName, tabName) u, err := url.Parse(name) if err != nil { return nil, fmt.Errorf("invalid url %s: %w", name, err) } np, err := configPath.ResolveReference(u) if err != nil { return nil, fmt.Errorf("resolve reference: %w", err) } if np.Bucket() != configPath.Bucket() { return nil, fmt.Errorf("tabState %s should not change bucket", name) } return np, nil } // tabulate transforms "grid" to only the part that needs to be displayed by the UI. // If an existingGrid is passed in, new results from "grid" will be grafted onto it. func tabulate(ctx context.Context, log logrus.FieldLogger, grid *statepb.Grid, tabCfg *configpb.DashboardTab, groupCfg *configpb.TestGroup, calculateStats, useTabAlertSettings bool, existingGrid *statepb.Grid) (*statepb.Grid, error) { if grid == nil { return nil, errors.New("no grid") } if tabCfg == nil || groupCfg == nil { return nil, errors.New("no config") } filterRows, err := filterGrid(tabCfg.BaseOptions, grid.Rows) if err != nil { return nil, fmt.Errorf("filterGrid: %w", err) } grid.Rows = filterRows inflatedGrid, issues, err := updater.InflateGrid(ctx, grid, time.Time{}, time.Now()) if err != nil { return nil, fmt.Errorf("inflateGrid: %w", err) } inflatedGrid = dropEmptyColumns(inflatedGrid) usesK8sClient := groupCfg.UseKubernetesClient || (groupCfg.GetResultSource().GetGcsConfig() != nil) var brokenThreshold float32 if calculateStats { brokenThreshold = tabCfg.BrokenColumnThreshold } var alert, unalert int if useTabAlertSettings { alert = int(tabCfg.GetAlertOptions().GetNumFailuresToAlert()) unalert = int(tabCfg.GetAlertOptions().GetNumPassesToDisableAlert()) } else { alert = int(groupCfg.NumFailuresToAlert) unalert = int(groupCfg.NumPassesToDisableAlert) } if existingGrid != nil { existingInflatedGrid, _, err := updater.InflateGrid(ctx, existingGrid, time.Time{}, time.Now()) if err != nil { return nil, fmt.Errorf("inflate existing grid: %w", err) } inflatedGrid = mergeGrids(existingInflatedGrid, inflatedGrid) } grid = updater.ConstructGrid(log, inflatedGrid, issues, alert, unalert, usesK8sClient, groupCfg.GetUserProperty(), brokenThreshold, groupCfg.GetColumnHeader()) return grid, nil } // mergeGrids merges two sorted, inflated grids together. // Precondition: "addition" is an output of an Updater with an "unknown" column last. // This final column will be dropped and replaced with existing results. func mergeGrids(existing, addition []updater.InflatedColumn) []updater.InflatedColumn { if len(addition) == 0 { return existing } seam := addition[len(addition)-1].Column.Started min := 0 max := len(existing) for min != max { check := (min + max) / 2 if existing[check].Column.Started <= seam { max = check } else { min = check + 1 } } if max == len(existing) { return addition } return append(addition[:len(addition)-1], existing[max:]...) } // dropEmptyColumns drops every column in-place that has no results func dropEmptyColumns(grid []updater.InflatedColumn) []updater.InflatedColumn { result := make([]updater.InflatedColumn, 0, len(grid)) for i, col := range grid { for _, cell := range col.Cells { if cell.Result != tspb.TestStatus_NO_RESULT { result = append(result, grid[i]) break } } } if len(result) == 0 && len(grid) != 0 { // If everything would be dropped, keep the first column so there's something left result = grid[0:1] } return result }