in pkg/updater/read.go [147:237]
func readColumns(ctx context.Context, client gcs.Downloader, log logrus.FieldLogger, group *configpb.TestGroup, builds []gcs.Build, stop time.Time, buildTimeout time.Duration, receivers chan<- InflatedColumn, readResult *resultReader, enableIgnoreSkip bool) {
if len(builds) == 0 {
return
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nameCfg := makeNameConfig(group)
var heads []string
for _, h := range group.ColumnHeader {
heads = append(heads, h.ConfigurationValue)
}
type resp struct {
build gcs.Build
res func() (*gcsResult, error)
}
ch := make(chan resp)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// TODO(fejta): restore inter-build concurrency
var failures int // since last good column
var extra []string
var started float64
for resp := range ch {
b := resp.build
log := log.WithField("build", b)
result, err := resp.res()
id := path.Base(b.Path.Object())
var col InflatedColumn
if err != nil {
failures++
log.WithError(err).Trace("Failed to read build")
if extra == nil {
extra = make([]string, len(heads))
}
when := started + 0.01*float64(failures)
var ancientErr *ancientError
var noStartErr *noStartError
if errors.As(err, &ancientErr) {
col = ancientColumn(id, when, extra, ancientErr.Error())
} else if errors.As(err, &noStartErr) {
col = noStartColumn(id, when, extra, noStartErr.Error())
} else {
msg := fmt.Sprintf("Failed to download %s: %s", b, err.Error())
col = erroredColumn(id, when, extra, msg)
}
} else {
opts := makeOptions(group)
if !enableIgnoreSkip {
opts.ignoreSkip = false
}
col = convertResult(log, nameCfg, id, heads, *result, opts)
log.WithField("rows", len(col.Cells)).Debug("Read result")
failures = 0
extra = col.Column.Extra
started = col.Column.Started
}
select {
case <-ctx.Done():
return
case receivers <- col:
}
}
}()
defer wg.Wait()
defer close(ch)
if len(builds) > 2 {
readResult.lock.Lock()
defer readResult.lock.Unlock()
}
for i := len(builds) - 1; i >= 0; i-- {
b := builds[i]
r := resp{
build: b,
res: readResult.read(ctx, client, b, stop),
}
select {
case <-ctx.Done():
return
case ch <- r:
}
}
}