func()

in eks/cluster-loader/cluster-loader.go [152:448]


func (ts *loader) Start() (err error) {
	ts.cfg.Logger.Info("starting cluster loader")

	if !fileutil.Exist(ts.cfg.TestConfigPath) {
		ts.cfg.Logger.Warn("clusterloader test config file does not exist", zap.String("path", ts.cfg.TestConfigPath))
		return fmt.Errorf("%q not found", ts.cfg.TestConfigPath)
	}

	if err = os.MkdirAll(ts.cfg.ReportDir, 0700); err != nil {
		return err
	}
	if err = fileutil.IsDirWriteable(ts.cfg.ReportDir); err != nil {
		return err
	}

	if err = ts.downloadClusterLoader(); err != nil {
		return err
	}
	if err = ts.writeTestOverrides(); err != nil {
		return err
	}

	args := []string{
		ts.cfg.ClusterLoaderPath,
		"--alsologtostderr",
		"--testconfig=" + ts.cfg.TestConfigPath,
		"--testoverrides=" + ts.testOverridesPath,
		"--report-dir=" + ts.cfg.ReportDir,
		"--nodes=" + fmt.Sprintf("%d", ts.cfg.Nodes),
		"--provider=eks",
	}
	if ts.cfg.KubeConfigPath == "" {
		// ref. https://github.com/kubernetes/perf-tests/pull/1295
		args = append(args, "--run-from-cluster=true")
	} else {
		args = append(args, "--kubeconfig="+ts.cfg.KubeConfigPath)
	}

	ts.testLogsFile, err = os.OpenFile(ts.cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
	if err != nil {
		return err
	}
	defer func() {
		ts.testLogsFile.Sync()
		ts.testLogsFile.Close()
	}()
	// stream command run outputs for debugging purposes
	checkDonec := make(chan struct{})
	go func() {
		defer func() {
			close(checkDonec)
		}()
		for {
			select {
			case <-ts.cfg.Stopc:
				ts.cfg.Logger.Info("exiting cluster loader command output checks")
				return
			case <-ts.rootCtx.Done():
				ts.cfg.Logger.Info("exiting cluster loader command output checks")
				return
			case <-time.After(10 * time.Second):
			}

			if ts.testLogsFile != nil {
				ts.testLogsFile.Sync()
			}
			ts.cfg.Logger.Info("checking cluster loader command output from logs file")
			b, lerr := ioutil.ReadFile(ts.cfg.LogPath)
			if lerr != nil {
				ts.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
				continue
			}
			output := strings.TrimSpace(string(b))
			lines := strings.Split(output, "\n")
			linesN := len(lines)

			ts.cfg.Logger.Info("checked cluster loader command output from logs file", zap.Int("total-lines", linesN))
			if linesN > 15 {
				output = strings.Join(lines[linesN-15:], "\n")
			}
			fmt.Fprintf(ts.cfg.LogWriter, "\n%q output:\n%s\n\n", ts.cfg.LogPath, output)
		}
	}()

	now := time.Now()
	errc := make(chan error)
	var runErr error
	ts.rootCtx, ts.rootCancel = context.WithTimeout(context.Background(), ts.cfg.Timeout)
	go func() {
		for i := 0; i < ts.cfg.Runs; i++ {
			select {
			case <-ts.rootCtx.Done():
				return
			case <-time.After(5 * time.Second):
			}

			rerr := ts.run(i, args)
			if rerr == nil {
				ts.cfg.Logger.Info("completed cluster loader", zap.Int("current-run", i), zap.Int("total-runs", ts.cfg.Runs))
				continue
			}

			ts.cfg.Logger.Warn("checking cluster loader error from log file", zap.Error(rerr))
			b, lerr := ioutil.ReadFile(ts.cfg.LogPath)
			if lerr != nil {
				ts.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
				errc <- rerr
				return
			}
			output := strings.TrimSpace(string(b))
			lines := strings.Split(output, "\n")
			linesN := len(lines)
			if linesN > 15 {
				output = strings.Join(lines[linesN-15:], "\n")
			}

			if strings.Contains(output, skipErr) {
				ts.cfg.Logger.Warn("cluster loader failed but continue", zap.String("skip-error-message", skipErr))
				continue
			}

			errc <- rerr
			return
		}
		errc <- nil
	}()
	select {
	case <-ts.donec:
		ts.cfg.Logger.Info("done cluster loader")
	case <-ts.cfg.Stopc:
		ts.cfg.Logger.Info("stopping cluster loader")
	case <-ts.rootCtx.Done():
		ts.cfg.Logger.Info("timed out cluster loader")
	case runErr = <-errc:
		if runErr == nil {
			ts.cfg.Logger.Info("successfully ran cluster loader",
				zap.String("took", time.Since(now).String()),
				zap.Int("total-runs", ts.cfg.Runs),
			)
		} else {
			ts.cfg.Logger.Warn("failed to run cluster loader",
				zap.String("took", time.Since(now).String()),
				zap.Int("total-runs", ts.cfg.Runs),
				zap.Error(runErr),
			)
		}
	}
	ts.rootCancel()
	select {
	case <-checkDonec:
		ts.cfg.Logger.Info("confirmed exit cluster loader command output checks")
	case <-time.After(3 * time.Minute):
		ts.cfg.Logger.Warn("took too long to confirm exit cluster loader command output checks")
	}
	if runErr != nil {
		ts.cfg.Logger.Warn("failed to run cluster loader", zap.Error(runErr))
	} else {
		ts.cfg.Logger.Info("successfully ran cluster loader")
	}

	lout, lerr := ioutil.ReadFile(ts.cfg.LogPath)
	if lerr != nil {
		ts.cfg.Logger.Warn("failed to read cluster loader log output", zap.Error(lerr))
		return lerr
	}
	logOutput := string(lout)
	testFinishedCount := strings.Count(logOutput, `] Test Finished`)

	// append results in "LogPath"
	// "0777" to fix "scp: /var/log/cluster-loader-remote.log: Permission denied"
	logFile, cerr := os.OpenFile(ts.cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777)
	if cerr != nil {
		return fmt.Errorf("open(%q): %v", ts.cfg.LogPath, cerr)
	}
	defer logFile.Close()

	podStartupLats := make([]measurement_util.PerfData, 0)
	cerr = filepath.Walk(ts.cfg.ReportDir, func(path string, info os.FileInfo, ferr error) error {
		if ferr != nil {
			return ferr
		}
		if info.IsDir() {
			return nil
		}
		ts.cfg.Logger.Info("found report", zap.String("path", path))

		if strings.HasPrefix(filepath.Base(path), "PodStartupLatency_") {
			ts.cfg.Logger.Info("parsing PodStartupLatency", zap.String("path", path))
			p, perr := ParsePodStartupLatency(path)
			if perr != nil {
				ts.cfg.Logger.Warn("failed to parse PodStartupLatency", zap.String("path", path))
				return perr
			}
			ts.cfg.Logger.Info("parsed PodStartupLatency", zap.String("path", path))
			podStartupLats = append(podStartupLats, p)
		}

		if _, werr := logFile.WriteString(fmt.Sprintf("\n\n\nreport output from %q:\n\n", path)); werr != nil {
			ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
			return nil
		}

		b, lerr := ioutil.ReadFile(path)
		if lerr != nil {
			ts.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
			if _, werr := logFile.WriteString(fmt.Sprintf("failed to write %v", lerr)); werr != nil {
				ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
				return nil
			}
		} else {
			if _, werr := logFile.Write(b); werr != nil {
				ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
				return nil
			}
		}
		return nil
	})
	if cerr != nil {
		return cerr
	}
	podStartupLat := MergePodStartupLatency(podStartupLats...)
	podStartupLatData, derr := json.Marshal(podStartupLat)
	if derr != nil {
		ts.cfg.Logger.Warn("failed to marshal PodStartupLatency", zap.Error(derr))
		return derr
	}
	if cerr = ioutil.WriteFile(ts.cfg.PodStartupLatencyPath, podStartupLatData, 0600); cerr != nil {
		ts.cfg.Logger.Warn("failed to write PodStartupLatency", zap.Error(cerr))
		return cerr
	}
	if serr := aws_s3.Upload(
		ts.cfg.Logger,
		ts.cfg.S3API,
		ts.cfg.S3BucketName,
		ts.cfg.PodStartupLatencyS3Key,
		ts.cfg.PodStartupLatencyPath,
	); serr != nil {
		return serr
	}

	ts.cfg.Logger.Info("gzipping report dir", zap.String("report-dir", ts.cfg.ReportDir), zap.String("file-path", ts.cfg.ReportTarGzPath))
	if cerr = os.RemoveAll(ts.cfg.ReportTarGzPath); cerr != nil {
		ts.cfg.Logger.Warn("failed to remove temp file", zap.Error(cerr))
		return cerr
	}
	if cerr = archiver.Archive([]string{ts.cfg.ReportDir}, ts.cfg.ReportTarGzPath); cerr != nil {
		ts.cfg.Logger.Warn("archive failed", zap.Error(cerr))
		return cerr
	}
	stat, cerr := os.Stat(ts.cfg.ReportTarGzPath)
	if cerr != nil {
		ts.cfg.Logger.Warn("failed to os stat", zap.Error(cerr))
		return cerr
	}
	sz := humanize.Bytes(uint64(stat.Size()))
	ts.cfg.Logger.Info("gzipped report dir", zap.String("report-dir", ts.cfg.ReportDir), zap.String("file-path", ts.cfg.ReportTarGzPath), zap.String("file-size", sz))
	if serr := aws_s3.Upload(
		ts.cfg.Logger,
		ts.cfg.S3API,
		ts.cfg.S3BucketName,
		ts.cfg.ReportTarGzS3Key,
		ts.cfg.ReportTarGzPath,
	); serr != nil {
		return serr
	}
	if serr := aws_s3.Upload(
		ts.cfg.Logger,
		ts.cfg.S3API,
		ts.cfg.S3BucketName,
		ts.cfg.LogS3Key,
		ts.cfg.LogPath,
	); serr != nil {
		return serr
	}

	if testFinishedCount == ts.cfg.Runs {
		ts.cfg.Logger.Info("completed expected test runs; overriding error",
			zap.Int("finished-count", testFinishedCount),
			zap.Int("expected-runs", ts.cfg.Runs),
			zap.Error(runErr),
		)
		runErr = nil
	} else {
		ts.cfg.Logger.Warn("failed to complete expected test runs",
			zap.Int("finished-count", testFinishedCount),
			zap.Int("expected-runs", ts.cfg.Runs),
			zap.Error(runErr),
		)
		completeErr := fmt.Errorf("failed to complete expected test runs [expected %d, completed %d]", ts.cfg.Runs, testFinishedCount)
		if runErr == nil {
			runErr = completeErr
		} else {
			runErr = fmt.Errorf("%v (run error: %v)", completeErr, runErr)
		}
	}
	return runErr
}