in testworkflow.go [927:1021]
func RunTests(ctx context.Context, storageClient *storage.Client, testWorkflows []*TestWorkflow, project, zone, gcsPath, localPath string, parallelCount int, parallelStagger string, testProjects []string) (junit.Testsuites, error) {
gcsPrefix, err := getGCSPrefix(ctx, storageClient, project, gcsPath)
if err != nil {
return junit.Testsuites{}, err
}
stagger, err := time.ParseDuration(parallelStagger)
if err != nil {
return junit.Testsuites{}, err
}
metrics := newTestMetrics(len(testWorkflows))
finalizeWorkflows(ctx, testWorkflows, zone, gcsPrefix, localPath)
testResults := make(chan testResult, len(testWorkflows))
testchan := make(chan *TestWorkflow, len(testWorkflows))
// Whenever we select a test project, we want to do so in a semi-random order
// that is unpredictable but doesn't have the ability to place all tests in a
// single project by chance (however small). This should randomize our usage
// patterns in static invocations of CIT (eg CI invocations) a bit more.
exclusiveProjects := make(chan string, len(testProjects))
// Select from testProjects in a random order, deleting afterwards to avoid
// selecting a duplicate.
nextProjects := make([]string, len(testProjects))
copy(nextProjects, testProjects)
for range testProjects {
i := rand.Intn(len(nextProjects))
exclusiveProjects <- nextProjects[i]
nextProjects = append(nextProjects[:i], nextProjects[i+1:]...)
}
projects := make(chan string, len(testWorkflows))
// Same technique as above, but this time we might have more workflows than
// projects, so anytime we delete all projects we reset to the full list.
nextProjects = make([]string, len(testProjects))
copy(nextProjects, testProjects)
for range testWorkflows {
if len(nextProjects) < 1 {
nextProjects = make([]string, len(testProjects))
copy(nextProjects, testProjects)
}
i := rand.Intn(len(nextProjects))
projects <- nextProjects[i]
nextProjects = append(nextProjects[:i], nextProjects[i+1:]...)
}
close(projects)
var wg sync.WaitGroup
for i := 0; i < parallelCount; i++ {
wg.Add(1)
go func(metrics *testMetrics, id int) {
defer wg.Done()
time.Sleep(time.Duration(id) * stagger)
for test := range testchan {
if test.lockProject {
// This will block until an exclusive project is available.
log.Printf("test %s/%s requires write lock for project", test.Name, test.Image.Name)
test.wf.Project = <-exclusiveProjects
} else {
test.wf.Project = <-projects
}
testResults <- runTestWorkflow(ctx, metrics, test)
if test.lockProject {
// "unlock" the project.
exclusiveProjects <- test.wf.Project
}
}
}(metrics, i)
}
for _, ts := range testWorkflows {
testchan <- ts
}
close(testchan)
wg.Wait()
var suites junit.Testsuites
var runtime float64
for i := 0; i < len(testWorkflows); i++ {
suites.Suites = append(suites.Suites, parseResult(<-testResults, localPath))
}
for _, suite := range suites.Suites {
suites.Errors += suite.Errors
suites.Failures += suite.Failures
suites.Tests += suite.Tests
suites.Disabled += suite.Disabled
suites.Skipped += suite.Skipped
if i, err := strconv.ParseFloat(suite.Time, 64); err == nil {
runtime += i
}
}
suites.Time = fmt.Sprintf("%.3f", runtime)
return suites, nil
}