in eks/cluster-loader/cluster-loader.go [152:448]
func (ts *loader) Start() (err error) {
ts.cfg.Logger.Info("starting cluster loader")
if !fileutil.Exist(ts.cfg.TestConfigPath) {
ts.cfg.Logger.Warn("clusterloader test config file does not exist", zap.String("path", ts.cfg.TestConfigPath))
return fmt.Errorf("%q not found", ts.cfg.TestConfigPath)
}
if err = os.MkdirAll(ts.cfg.ReportDir, 0700); err != nil {
return err
}
if err = fileutil.IsDirWriteable(ts.cfg.ReportDir); err != nil {
return err
}
if err = ts.downloadClusterLoader(); err != nil {
return err
}
if err = ts.writeTestOverrides(); err != nil {
return err
}
args := []string{
ts.cfg.ClusterLoaderPath,
"--alsologtostderr",
"--testconfig=" + ts.cfg.TestConfigPath,
"--testoverrides=" + ts.testOverridesPath,
"--report-dir=" + ts.cfg.ReportDir,
"--nodes=" + fmt.Sprintf("%d", ts.cfg.Nodes),
"--provider=eks",
}
if ts.cfg.KubeConfigPath == "" {
// ref. https://github.com/kubernetes/perf-tests/pull/1295
args = append(args, "--run-from-cluster=true")
} else {
args = append(args, "--kubeconfig="+ts.cfg.KubeConfigPath)
}
ts.testLogsFile, err = os.OpenFile(ts.cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return err
}
defer func() {
ts.testLogsFile.Sync()
ts.testLogsFile.Close()
}()
// stream command run outputs for debugging purposes
checkDonec := make(chan struct{})
go func() {
defer func() {
close(checkDonec)
}()
for {
select {
case <-ts.cfg.Stopc:
ts.cfg.Logger.Info("exiting cluster loader command output checks")
return
case <-ts.rootCtx.Done():
ts.cfg.Logger.Info("exiting cluster loader command output checks")
return
case <-time.After(10 * time.Second):
}
if ts.testLogsFile != nil {
ts.testLogsFile.Sync()
}
ts.cfg.Logger.Info("checking cluster loader command output from logs file")
b, lerr := ioutil.ReadFile(ts.cfg.LogPath)
if lerr != nil {
ts.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
continue
}
output := strings.TrimSpace(string(b))
lines := strings.Split(output, "\n")
linesN := len(lines)
ts.cfg.Logger.Info("checked cluster loader command output from logs file", zap.Int("total-lines", linesN))
if linesN > 15 {
output = strings.Join(lines[linesN-15:], "\n")
}
fmt.Fprintf(ts.cfg.LogWriter, "\n%q output:\n%s\n\n", ts.cfg.LogPath, output)
}
}()
now := time.Now()
errc := make(chan error)
var runErr error
ts.rootCtx, ts.rootCancel = context.WithTimeout(context.Background(), ts.cfg.Timeout)
go func() {
for i := 0; i < ts.cfg.Runs; i++ {
select {
case <-ts.rootCtx.Done():
return
case <-time.After(5 * time.Second):
}
rerr := ts.run(i, args)
if rerr == nil {
ts.cfg.Logger.Info("completed cluster loader", zap.Int("current-run", i), zap.Int("total-runs", ts.cfg.Runs))
continue
}
ts.cfg.Logger.Warn("checking cluster loader error from log file", zap.Error(rerr))
b, lerr := ioutil.ReadFile(ts.cfg.LogPath)
if lerr != nil {
ts.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
errc <- rerr
return
}
output := strings.TrimSpace(string(b))
lines := strings.Split(output, "\n")
linesN := len(lines)
if linesN > 15 {
output = strings.Join(lines[linesN-15:], "\n")
}
if strings.Contains(output, skipErr) {
ts.cfg.Logger.Warn("cluster loader failed but continue", zap.String("skip-error-message", skipErr))
continue
}
errc <- rerr
return
}
errc <- nil
}()
select {
case <-ts.donec:
ts.cfg.Logger.Info("done cluster loader")
case <-ts.cfg.Stopc:
ts.cfg.Logger.Info("stopping cluster loader")
case <-ts.rootCtx.Done():
ts.cfg.Logger.Info("timed out cluster loader")
case runErr = <-errc:
if runErr == nil {
ts.cfg.Logger.Info("successfully ran cluster loader",
zap.String("took", time.Since(now).String()),
zap.Int("total-runs", ts.cfg.Runs),
)
} else {
ts.cfg.Logger.Warn("failed to run cluster loader",
zap.String("took", time.Since(now).String()),
zap.Int("total-runs", ts.cfg.Runs),
zap.Error(runErr),
)
}
}
ts.rootCancel()
select {
case <-checkDonec:
ts.cfg.Logger.Info("confirmed exit cluster loader command output checks")
case <-time.After(3 * time.Minute):
ts.cfg.Logger.Warn("took too long to confirm exit cluster loader command output checks")
}
if runErr != nil {
ts.cfg.Logger.Warn("failed to run cluster loader", zap.Error(runErr))
} else {
ts.cfg.Logger.Info("successfully ran cluster loader")
}
lout, lerr := ioutil.ReadFile(ts.cfg.LogPath)
if lerr != nil {
ts.cfg.Logger.Warn("failed to read cluster loader log output", zap.Error(lerr))
return lerr
}
logOutput := string(lout)
testFinishedCount := strings.Count(logOutput, `] Test Finished`)
// append results in "LogPath"
// "0777" to fix "scp: /var/log/cluster-loader-remote.log: Permission denied"
logFile, cerr := os.OpenFile(ts.cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777)
if cerr != nil {
return fmt.Errorf("open(%q): %v", ts.cfg.LogPath, cerr)
}
defer logFile.Close()
podStartupLats := make([]measurement_util.PerfData, 0)
cerr = filepath.Walk(ts.cfg.ReportDir, func(path string, info os.FileInfo, ferr error) error {
if ferr != nil {
return ferr
}
if info.IsDir() {
return nil
}
ts.cfg.Logger.Info("found report", zap.String("path", path))
if strings.HasPrefix(filepath.Base(path), "PodStartupLatency_") {
ts.cfg.Logger.Info("parsing PodStartupLatency", zap.String("path", path))
p, perr := ParsePodStartupLatency(path)
if perr != nil {
ts.cfg.Logger.Warn("failed to parse PodStartupLatency", zap.String("path", path))
return perr
}
ts.cfg.Logger.Info("parsed PodStartupLatency", zap.String("path", path))
podStartupLats = append(podStartupLats, p)
}
if _, werr := logFile.WriteString(fmt.Sprintf("\n\n\nreport output from %q:\n\n", path)); werr != nil {
ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
return nil
}
b, lerr := ioutil.ReadFile(path)
if lerr != nil {
ts.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
if _, werr := logFile.WriteString(fmt.Sprintf("failed to write %v", lerr)); werr != nil {
ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
return nil
}
} else {
if _, werr := logFile.Write(b); werr != nil {
ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
return nil
}
}
return nil
})
if cerr != nil {
return cerr
}
podStartupLat := MergePodStartupLatency(podStartupLats...)
podStartupLatData, derr := json.Marshal(podStartupLat)
if derr != nil {
ts.cfg.Logger.Warn("failed to marshal PodStartupLatency", zap.Error(derr))
return derr
}
if cerr = ioutil.WriteFile(ts.cfg.PodStartupLatencyPath, podStartupLatData, 0600); cerr != nil {
ts.cfg.Logger.Warn("failed to write PodStartupLatency", zap.Error(cerr))
return cerr
}
if serr := aws_s3.Upload(
ts.cfg.Logger,
ts.cfg.S3API,
ts.cfg.S3BucketName,
ts.cfg.PodStartupLatencyS3Key,
ts.cfg.PodStartupLatencyPath,
); serr != nil {
return serr
}
ts.cfg.Logger.Info("gzipping report dir", zap.String("report-dir", ts.cfg.ReportDir), zap.String("file-path", ts.cfg.ReportTarGzPath))
if cerr = os.RemoveAll(ts.cfg.ReportTarGzPath); cerr != nil {
ts.cfg.Logger.Warn("failed to remove temp file", zap.Error(cerr))
return cerr
}
if cerr = archiver.Archive([]string{ts.cfg.ReportDir}, ts.cfg.ReportTarGzPath); cerr != nil {
ts.cfg.Logger.Warn("archive failed", zap.Error(cerr))
return cerr
}
stat, cerr := os.Stat(ts.cfg.ReportTarGzPath)
if cerr != nil {
ts.cfg.Logger.Warn("failed to os stat", zap.Error(cerr))
return cerr
}
sz := humanize.Bytes(uint64(stat.Size()))
ts.cfg.Logger.Info("gzipped report dir", zap.String("report-dir", ts.cfg.ReportDir), zap.String("file-path", ts.cfg.ReportTarGzPath), zap.String("file-size", sz))
if serr := aws_s3.Upload(
ts.cfg.Logger,
ts.cfg.S3API,
ts.cfg.S3BucketName,
ts.cfg.ReportTarGzS3Key,
ts.cfg.ReportTarGzPath,
); serr != nil {
return serr
}
if serr := aws_s3.Upload(
ts.cfg.Logger,
ts.cfg.S3API,
ts.cfg.S3BucketName,
ts.cfg.LogS3Key,
ts.cfg.LogPath,
); serr != nil {
return serr
}
if testFinishedCount == ts.cfg.Runs {
ts.cfg.Logger.Info("completed expected test runs; overriding error",
zap.Int("finished-count", testFinishedCount),
zap.Int("expected-runs", ts.cfg.Runs),
zap.Error(runErr),
)
runErr = nil
} else {
ts.cfg.Logger.Warn("failed to complete expected test runs",
zap.Int("finished-count", testFinishedCount),
zap.Int("expected-runs", ts.cfg.Runs),
zap.Error(runErr),
)
completeErr := fmt.Errorf("failed to complete expected test runs [expected %d, completed %d]", ts.cfg.Runs, testFinishedCount)
if runErr == nil {
runErr = completeErr
} else {
runErr = fmt.Errorf("%v (run error: %v)", completeErr, runErr)
}
}
return runErr
}