func readColumns()

in pkg/updater/read.go [147:237]


func readColumns(ctx context.Context, client gcs.Downloader, log logrus.FieldLogger, group *configpb.TestGroup, builds []gcs.Build, stop time.Time, buildTimeout time.Duration, receivers chan<- InflatedColumn, readResult *resultReader, enableIgnoreSkip bool) {
	if len(builds) == 0 {
		return
	}

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	nameCfg := makeNameConfig(group)
	var heads []string
	for _, h := range group.ColumnHeader {
		heads = append(heads, h.ConfigurationValue)
	}

	type resp struct {
		build gcs.Build
		res   func() (*gcsResult, error)
	}

	ch := make(chan resp)
	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		// TODO(fejta): restore inter-build concurrency
		var failures int // since last good column
		var extra []string
		var started float64
		for resp := range ch {
			b := resp.build
			log := log.WithField("build", b)
			result, err := resp.res()
			id := path.Base(b.Path.Object())
			var col InflatedColumn
			if err != nil {
				failures++
				log.WithError(err).Trace("Failed to read build")
				if extra == nil {
					extra = make([]string, len(heads))
				}
				when := started + 0.01*float64(failures)
				var ancientErr *ancientError
				var noStartErr *noStartError
				if errors.As(err, &ancientErr) {
					col = ancientColumn(id, when, extra, ancientErr.Error())
				} else if errors.As(err, &noStartErr) {
					col = noStartColumn(id, when, extra, noStartErr.Error())
				} else {
					msg := fmt.Sprintf("Failed to download %s: %s", b, err.Error())
					col = erroredColumn(id, when, extra, msg)
				}
			} else {
				opts := makeOptions(group)
				if !enableIgnoreSkip {
					opts.ignoreSkip = false
				}
				col = convertResult(log, nameCfg, id, heads, *result, opts)
				log.WithField("rows", len(col.Cells)).Debug("Read result")
				failures = 0
				extra = col.Column.Extra
				started = col.Column.Started
			}

			select {
			case <-ctx.Done():
				return
			case receivers <- col:
			}
		}
	}()
	defer wg.Wait()

	defer close(ch)
	if len(builds) > 2 {
		readResult.lock.Lock()
		defer readResult.lock.Unlock()
	}
	for i := len(builds) - 1; i >= 0; i-- {
		b := builds[i]
		r := resp{
			build: b,
			res:   readResult.read(ctx, client, b, stop),
		}
		select {
		case <-ctx.Done():
			return
		case ch <- r:
		}
	}
}