pkg/updater/read.go (501 lines of code) (raw):
/*
Copyright 2020 The TestGrid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package updater
import (
"context"
"errors"
"fmt"
"io"
"path"
"sort"
"strings"
"sync"
"time"
"cloud.google.com/go/storage"
configpb "github.com/GoogleCloudPlatform/testgrid/pb/config"
evalpb "github.com/GoogleCloudPlatform/testgrid/pb/custom_evaluator"
statepb "github.com/GoogleCloudPlatform/testgrid/pb/state"
statuspb "github.com/GoogleCloudPlatform/testgrid/pb/test_status"
"github.com/GoogleCloudPlatform/testgrid/util/gcs"
"github.com/fvbommel/sortorder"
"github.com/sirupsen/logrus"
)
// hintStarted returns the maximum hint
func hintStarted(cols []InflatedColumn) string {
var hint string
for i, col := range cols {
if newHint := col.Column.Hint; i == 0 || sortorder.NaturalLess(hint, newHint) {
hint = newHint
}
}
return hint
}
func gcsColumnReader(client gcs.Client, buildTimeout time.Duration, readResult *resultReader, enableIgnoreSkip bool) ColumnReader {
return func(ctx context.Context, parentLog logrus.FieldLogger, tg *configpb.TestGroup, oldCols []InflatedColumn, stop time.Time, receivers chan<- InflatedColumn) error {
tgPaths, err := groupPaths(tg)
if err != nil {
return fmt.Errorf("group path: %w", err)
}
since := hintStarted(oldCols)
log := parentLog.WithField("since", since)
log.Trace("Listing builds...")
listBuildsStart := time.Now()
builds, err := listBuilds(ctx, client, since, tgPaths...)
if errors.Is(err, storage.ErrBucketNotExist) {
log.WithError(err).Info("Bucket does not exist")
return nil
}
if err != nil {
return fmt.Errorf("list builds: %w", err)
}
log.WithField("listBuilds", time.Since(listBuildsStart)).WithField("total", len(builds)).Debug("Listed builds")
readColumns(ctx, client, log, tg, builds, stop, buildTimeout, receivers, readResult, enableIgnoreSkip)
return nil
}
}
func resultReaderPool(poolCtx context.Context, log *logrus.Entry, concurrency int) *resultReader {
type request struct {
ctx context.Context
client gcs.Downloader
build gcs.Build
stop time.Time
res *gcsResult
err error
wg sync.WaitGroup
}
ch := make(chan *request, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
log = log.WithField("concurrency", concurrency)
log.Info("Starting up result reader pool")
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for req := range ch {
req.res, req.err = readResult(req.ctx, req.client, req.build, req.stop)
req.wg.Done()
}
}()
}
go func() {
<-poolCtx.Done()
log.Info("Shutting down result reader pool")
close(ch)
wg.Wait()
log.Info("Result reader pool stopped")
}()
readResultViaPool := func(ctx context.Context, client gcs.Downloader, build gcs.Build, stop time.Time) func() (*gcsResult, error) {
req := &request{
ctx: ctx,
client: client,
build: build,
stop: stop,
}
req.wg.Add(1)
select {
case <-ctx.Done():
return func() (*gcsResult, error) { return nil, ctx.Err() }
case ch <- req: // wait for request to get onto the queue
return func() (*gcsResult, error) {
req.wg.Wait()
return req.res, req.err
}
}
}
return &resultReader{
lock: &sync.Mutex{},
read: readResultViaPool,
}
}
type resultReader struct {
lock sync.Locker
read func(context.Context, gcs.Downloader, gcs.Build, time.Time) func() (*gcsResult, error)
}
// readColumns will list, download and process builds into inflatedColumns.
func readColumns(ctx context.Context, client gcs.Downloader, log logrus.FieldLogger, group *configpb.TestGroup, builds []gcs.Build, stop time.Time, buildTimeout time.Duration, receivers chan<- InflatedColumn, readResult *resultReader, enableIgnoreSkip bool) {
if len(builds) == 0 {
return
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nameCfg := makeNameConfig(group)
var heads []string
for _, h := range group.ColumnHeader {
heads = append(heads, h.ConfigurationValue)
}
type resp struct {
build gcs.Build
res func() (*gcsResult, error)
}
ch := make(chan resp)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// TODO(fejta): restore inter-build concurrency
var failures int // since last good column
var extra []string
var started float64
for resp := range ch {
b := resp.build
log := log.WithField("build", b)
result, err := resp.res()
id := path.Base(b.Path.Object())
var col InflatedColumn
if err != nil {
failures++
log.WithError(err).Trace("Failed to read build")
if extra == nil {
extra = make([]string, len(heads))
}
when := started + 0.01*float64(failures)
var ancientErr *ancientError
var noStartErr *noStartError
if errors.As(err, &ancientErr) {
col = ancientColumn(id, when, extra, ancientErr.Error())
} else if errors.As(err, &noStartErr) {
col = noStartColumn(id, when, extra, noStartErr.Error())
} else {
msg := fmt.Sprintf("Failed to download %s: %s", b, err.Error())
col = erroredColumn(id, when, extra, msg)
}
} else {
opts := makeOptions(group)
if !enableIgnoreSkip {
opts.ignoreSkip = false
}
col = convertResult(log, nameCfg, id, heads, *result, opts)
log.WithField("rows", len(col.Cells)).Debug("Read result")
failures = 0
extra = col.Column.Extra
started = col.Column.Started
}
select {
case <-ctx.Done():
return
case receivers <- col:
}
}
}()
defer wg.Wait()
defer close(ch)
if len(builds) > 2 {
readResult.lock.Lock()
defer readResult.lock.Unlock()
}
for i := len(builds) - 1; i >= 0; i-- {
b := builds[i]
r := resp{
build: b,
res: readResult.read(ctx, client, b, stop),
}
select {
case <-ctx.Done():
return
case ch <- r:
}
}
}
func ancientColumn(id string, when float64, extra []string, msg string) InflatedColumn {
return InflatedColumn{
Column: &statepb.Column{
Build: id,
Hint: id,
Started: when,
Extra: extra,
},
Cells: map[string]Cell{
overallRow: {
Message: msg,
Result: statuspb.TestStatus_UNKNOWN,
},
},
}
}
func noStartColumn(id string, when float64, extra []string, msg string) InflatedColumn {
return InflatedColumn{
Column: &statepb.Column{
Build: id,
Hint: id,
Started: when,
Extra: extra,
},
Cells: map[string]Cell{
overallRow: {
Message: msg,
Result: statuspb.TestStatus_RUNNING,
},
},
}
}
func erroredColumn(id string, when float64, extra []string, msg string) InflatedColumn {
return InflatedColumn{
Column: &statepb.Column{
Build: id,
Hint: id,
Started: when,
Extra: extra,
},
Cells: map[string]Cell{
overallRow: {
Message: msg,
Result: statuspb.TestStatus_TOOL_FAIL,
},
},
}
}
type groupOptions struct {
merge bool
analyzeProwJob bool
addCellID bool
metricKey string
buildKey string
userKey string
annotations []*configpb.TestGroup_TestAnnotation
rules []*evalpb.Rule
ignoreSkip bool
}
func makeOptions(group *configpb.TestGroup) groupOptions {
return groupOptions{
merge: !group.DisableMergedStatus,
analyzeProwJob: !group.DisableProwjobAnalysis,
addCellID: group.BuildOverrideStrftime != "",
metricKey: group.ShortTextMetric,
buildKey: group.BuildOverrideConfigurationValue,
userKey: group.UserProperty,
annotations: group.TestAnnotations,
rules: group.GetCustomEvaluatorRuleSet().GetRules(),
ignoreSkip: group.GetIgnoreSkip(),
}
}
const (
testsName = "Tests name"
jobName = "Job name"
)
type nameConfig struct {
format string
parts []string
multiJob bool
}
// render the metadata into the expect test name format.
//
// Argument order determines precedence.
func (nc nameConfig) render(job, test string, metadatas ...map[string]string) string {
parsed := make([]interface{}, len(nc.parts))
for i, p := range nc.parts {
var s string
switch p {
case jobName:
s = job
case testsName:
s = test
default:
for _, metadata := range metadatas {
v, present := metadata[p]
if present {
s = v
break
}
}
}
parsed[i] = s
}
return fmt.Sprintf(nc.format, parsed...)
}
func makeNameConfig(group *configpb.TestGroup) nameConfig {
nameCfg := convertNameConfig(group.TestNameConfig)
if strings.Contains(gcsPrefix(group), ",") {
nameCfg.multiJob = true
ensureJobName(&nameCfg)
}
return nameCfg
}
func firstFilled(strs ...string) string {
for _, s := range strs {
if s != "" {
return s
}
}
return ""
}
func convertNameConfig(tnc *configpb.TestNameConfig) nameConfig {
if tnc == nil {
return nameConfig{
format: "%s",
parts: []string{testsName},
}
}
nc := nameConfig{
format: tnc.NameFormat,
parts: make([]string, len(tnc.NameElements)),
}
for i, e := range tnc.NameElements {
// TODO(fejta): build_target = true
// TODO(fejta): tags = 'SOMETHING'
nc.parts[i] = firstFilled(e.TargetConfig, e.TestProperty)
}
return nc
}
func ensureJobName(nc *nameConfig) {
for _, p := range nc.parts {
if p == jobName {
return
}
}
nc.format = "%s." + nc.format
nc.parts = append([]string{jobName}, nc.parts...)
}
type ancientError struct {
msg string
}
func (e *ancientError) Error() string {
return e.msg
}
type noStartError struct{}
func (e *noStartError) Error() string {
return "Start timestamp for this job is 0."
}
// readResult will download all GCS artifacts in parallel.
//
// Specifically download the following files:
// * started.json
// * finished.json
// * any junit.xml files under the artifacts directory.
func readResult(parent context.Context, client gcs.Downloader, build gcs.Build, stop time.Time) (*gcsResult, error) {
ctx, cancel := context.WithCancel(parent) // Allows aborting after first error
defer cancel()
result := gcsResult{
job: build.Job(),
build: build.Build(),
}
ec := make(chan error) // Receives errors from anyone
var lock sync.Mutex
addMalformed := func(s ...string) {
lock.Lock()
defer lock.Unlock()
result.malformed = append(result.malformed, s...)
}
var work int
// Download podinfo.json
work++
go func() {
pi, err := build.PodInfo(ctx, client)
switch {
case errors.Is(err, io.EOF):
addMalformed("podinfo.json")
err = nil
case err != nil:
err = fmt.Errorf("podinfo: %w", err)
case pi != nil:
result.podInfo = *pi
}
select {
case <-ctx.Done():
case ec <- err:
}
}()
// Download started.json
work++
go func() {
s, err := build.Started(ctx, client)
switch {
case errors.Is(err, io.EOF):
addMalformed("started.json")
err = nil
case err != nil:
err = fmt.Errorf("started: %w", err)
case time.Unix(s.Timestamp, 0).Before(stop):
err = &ancientError{fmt.Sprintf("build too old; started %v before %v)", s.Timestamp, stop.Unix())}
if s.Timestamp == 0 {
err = &noStartError{}
}
default:
result.started = *s
}
select {
case <-ctx.Done():
case ec <- err:
}
}()
// Download finished.json
work++
go func() {
f, err := build.Finished(ctx, client)
switch {
case errors.Is(err, io.EOF):
addMalformed("finished.json")
err = nil
case err != nil:
err = fmt.Errorf("finished: %w", err)
default:
result.finished = *f
}
select {
case <-ctx.Done():
case ec <- err:
}
}()
// Download suites
work++
go func() {
suites, err := readSuites(ctx, client, build)
if err != nil {
err = fmt.Errorf("suites: %w", err)
}
var problems []string
for _, s := range suites {
if s.Err != nil {
p := strings.TrimPrefix(s.Path, build.Path.String())
problems = append(problems, fmt.Sprintf("%s: %s", p, s.Err))
} else {
result.suites = append(result.suites, s)
}
}
if len(problems) > 0 {
addMalformed(problems...)
}
select {
case <-ctx.Done():
case ec <- err:
}
}()
for ; work > 0; work-- {
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout: %w", ctx.Err())
case err := <-ec:
if err != nil {
return nil, err
}
}
}
sort.Slice(result.malformed, func(i, j int) bool {
return result.malformed[i] < result.malformed[j]
})
return &result, nil
}
// readSuites asynchronously lists and downloads junit.xml files
func readSuites(parent context.Context, client gcs.Downloader, build gcs.Build) ([]gcs.SuitesMeta, error) {
ctx, cancel := context.WithCancel(parent)
defer cancel()
ec := make(chan error)
// List
artifacts := make(chan string, 1)
go func() {
defer close(artifacts) // No more artifacts
if err := build.Artifacts(ctx, client, artifacts); err != nil {
select {
case <-ctx.Done():
case ec <- fmt.Errorf("list: %w", err):
}
}
}()
// Download
suitesChan := make(chan gcs.SuitesMeta, 1)
go func() {
defer close(suitesChan) // No more rows
const max = 1000
if err := build.Suites(ctx, client, artifacts, suitesChan, max); err != nil {
select {
case <-ctx.Done():
case ec <- fmt.Errorf("download: %w", err):
}
}
}()
// Append
var suites []gcs.SuitesMeta
go func() {
for suite := range suitesChan {
suites = append(suites, suite)
}
select {
case <-ctx.Done():
case ec <- nil:
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-ec:
if err != nil {
return nil, err
}
}
return suites, nil
}