in cmd/coordinator/buildstatus.go [1404:1564]
func (st *buildStatus) runTests(helpers <-chan buildlet.Client) (remoteErr, err error) {
testNames, remoteErr, err := st.distTestList()
if remoteErr != nil {
return fmt.Errorf("distTestList remote: %v", remoteErr), nil
}
if err != nil {
return nil, fmt.Errorf("distTestList exec: %v", err)
}
testStats := getTestStats(st)
set, err := st.newTestSet(testStats, testNames)
if err != nil {
return nil, err
}
st.LogEventTime("starting_tests", fmt.Sprintf("%d tests", len(set.items)))
startTime := time.Now()
workDir, err := st.bc.WorkDir(st.ctx)
if err != nil {
return nil, fmt.Errorf("error discovering workdir for main buildlet, %s: %v", st.bc.Name(), err)
}
mainBuildletGoroot := st.conf.FilePathJoin(workDir, "go")
mainBuildletGopath := st.conf.FilePathJoin(workDir, "gopath")
// We use our original buildlet to run the tests in order, to
// make the streaming somewhat smooth and not incredibly
// lumpy. The rest of the buildlets run the largest tests
// first (critical path scheduling).
// The buildletActivity WaitGroup is used to track when all
// the buildlets are dead or done.
var buildletActivity sync.WaitGroup
buildletActivity.Add(2) // one per goroutine below (main + helper launcher goroutine)
go func() {
defer buildletActivity.Done() // for the per-goroutine Add(2) above
for !st.bc.IsBroken() {
tis, ok := set.testsToRunInOrder()
if !ok {
select {
case <-st.ctx.Done():
return
case <-time.After(5 * time.Second):
}
continue
}
st.runTestsOnBuildlet(st.bc, tis, mainBuildletGoroot, mainBuildletGopath)
}
st.LogEventTime("main_buildlet_broken", st.bc.Name())
}()
go func() {
defer buildletActivity.Done() // for the per-goroutine Add(2) above
for helper := range helpers {
buildletActivity.Add(1)
go func(bc buildlet.Client) {
defer buildletActivity.Done() // for the per-helper Add(1) above
defer st.LogEventTime("closed_helper", bc.Name())
defer bc.Close()
if devPause {
defer time.Sleep(5 * time.Minute)
defer st.LogEventTime("DEV_HELPER_SLEEP", bc.Name())
}
st.LogEventTime("got_empty_test_helper", bc.String())
if err := bc.PutTarFromURL(st.ctx, st.SnapshotURL(pool.NewGCEConfiguration().BuildEnv()), "go"); err != nil {
log.Printf("failed to extract snapshot for helper %s: %v", bc.Name(), err)
return
}
workDir, err := bc.WorkDir(st.ctx)
if err != nil {
log.Printf("error discovering workdir for helper %s: %v", bc.Name(), err)
return
}
st.LogEventTime("test_helper_set_up", bc.Name())
goroot := st.conf.FilePathJoin(workDir, "go")
gopath := st.conf.FilePathJoin(workDir, "gopath")
for !bc.IsBroken() {
tis, ok := set.testsToRunBiggestFirst()
if !ok {
st.LogEventTime("no_new_tests_remain", bc.Name())
return
}
st.runTestsOnBuildlet(bc, tis, goroot, gopath)
}
st.LogEventTime("test_helper_is_broken", bc.Name())
}(helper)
}
}()
// Convert a sync.WaitGroup into a channel.
// Aside: https://groups.google.com/forum/#!topic/golang-dev/7fjGWuImu5k
buildletsGone := make(chan struct{})
go func() {
buildletActivity.Wait()
close(buildletsGone)
}()
var lastMetadata string
var lastHeader string
var serialDuration time.Duration
for _, ti := range set.items {
AwaitDone:
for {
timer := time.NewTimer(30 * time.Second)
select {
case <-ti.done: // wait for success
timer.Stop()
break AwaitDone
case <-timer.C:
st.LogEventTime("still_waiting_on_test", ti.name)
case <-buildletsGone:
set.cancelAll()
return nil, errBuildletsGone
}
}
serialDuration += ti.execDuration
if len(ti.output) > 0 {
metadata, header, out := parseOutputAndHeader(ti.output)
printHeader := false
if metadata != lastMetadata {
lastMetadata = metadata
fmt.Fprintf(st, "\n%s\n", metadata)
// Always include the test header after
// metadata changes. This is a readability
// optimization that ensures that tests are
// always immediately preceeded by their test
// banner, even if it is duplicate banner
// because the test metadata changed.
printHeader = true
}
if header != lastHeader {
lastHeader = header
printHeader = true
}
if printHeader {
fmt.Fprintf(st, "\n%s\n", header)
}
if pool.NewGCEConfiguration().InStaging() {
out = bytes.TrimSuffix(out, nl)
st.Write(out)
fmt.Fprintf(st, " (shard %s; par=%d)\n", ti.shardIPPort, ti.groupSize)
} else {
st.Write(out)
}
}
if ti.remoteErr != nil {
set.cancelAll()
return fmt.Errorf("dist test failed: %s: %v", ti.name, ti.remoteErr), nil
}
}
elapsed := time.Since(startTime)
var msg string
if st.conf.NumTestHelpers(st.isTry()) > 0 {
msg = fmt.Sprintf("took %v; aggregate %v; saved %v", elapsed, serialDuration, serialDuration-elapsed)
} else {
msg = fmt.Sprintf("took %v", elapsed)
}
st.LogEventTime("tests_complete", msg)
fmt.Fprintf(st, "\nAll tests passed.\n")
return nil, nil
}