func RunTests()

in testworkflow.go [927:1021]


func RunTests(ctx context.Context, storageClient *storage.Client, testWorkflows []*TestWorkflow, project, zone, gcsPath, localPath string, parallelCount int, parallelStagger string, testProjects []string) (junit.Testsuites, error) {
	gcsPrefix, err := getGCSPrefix(ctx, storageClient, project, gcsPath)
	if err != nil {
		return junit.Testsuites{}, err
	}
	stagger, err := time.ParseDuration(parallelStagger)
	if err != nil {
		return junit.Testsuites{}, err
	}

	metrics := newTestMetrics(len(testWorkflows))
	finalizeWorkflows(ctx, testWorkflows, zone, gcsPrefix, localPath)

	testResults := make(chan testResult, len(testWorkflows))
	testchan := make(chan *TestWorkflow, len(testWorkflows))

	// Whenever we select a test project, we want to do so in a semi-random order
	// that is unpredictable but doesn't have the ability to place all tests in a
	// single project by chance (however small). This should randomize our usage
	// patterns in static invocations of CIT (eg CI invocations) a bit more.

	exclusiveProjects := make(chan string, len(testProjects))
	// Select from testProjects in a random order, deleting afterwards to avoid
	// selecting a duplicate.
	nextProjects := make([]string, len(testProjects))
	copy(nextProjects, testProjects)
	for range testProjects {
		i := rand.Intn(len(nextProjects))
		exclusiveProjects <- nextProjects[i]
		nextProjects = append(nextProjects[:i], nextProjects[i+1:]...)
	}

	projects := make(chan string, len(testWorkflows))
	// Same technique as above, but this time we might have more workflows than
	// projects, so anytime we delete all projects we reset to the full list.
	nextProjects = make([]string, len(testProjects))
	copy(nextProjects, testProjects)
	for range testWorkflows {
		if len(nextProjects) < 1 {
			nextProjects = make([]string, len(testProjects))
			copy(nextProjects, testProjects)
		}
		i := rand.Intn(len(nextProjects))
		projects <- nextProjects[i]
		nextProjects = append(nextProjects[:i], nextProjects[i+1:]...)
	}
	close(projects)

	var wg sync.WaitGroup
	for i := 0; i < parallelCount; i++ {
		wg.Add(1)
		go func(metrics *testMetrics, id int) {
			defer wg.Done()
			time.Sleep(time.Duration(id) * stagger)
			for test := range testchan {
				if test.lockProject {
					// This will block until an exclusive project is available.
					log.Printf("test %s/%s requires write lock for project", test.Name, test.Image.Name)
					test.wf.Project = <-exclusiveProjects
				} else {
					test.wf.Project = <-projects
				}
				testResults <- runTestWorkflow(ctx, metrics, test)
				if test.lockProject {
					// "unlock" the project.
					exclusiveProjects <- test.wf.Project
				}
			}
		}(metrics, i)
	}
	for _, ts := range testWorkflows {
		testchan <- ts
	}
	close(testchan)
	wg.Wait()

	var suites junit.Testsuites
	var runtime float64
	for i := 0; i < len(testWorkflows); i++ {
		suites.Suites = append(suites.Suites, parseResult(<-testResults, localPath))
	}
	for _, suite := range suites.Suites {
		suites.Errors += suite.Errors
		suites.Failures += suite.Failures
		suites.Tests += suite.Tests
		suites.Disabled += suite.Disabled
		suites.Skipped += suite.Skipped
		if i, err := strconv.ParseFloat(suite.Time, 64); err == nil {
			runtime += i
		}
	}
	suites.Time = fmt.Sprintf("%.3f", runtime)

	return suites, nil
}