pkg/testing/runner/runner.go (802 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package runner import ( "bytes" "context" "errors" "fmt" "io" "os" "path/filepath" "slices" "strings" "sync" "time" "golang.org/x/crypto/ssh" "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/common" "github.com/elastic/elastic-agent/pkg/testing/define" tssh "github.com/elastic/elastic-agent/pkg/testing/ssh" "github.com/elastic/elastic-agent/pkg/testing/supported" ) // Result is the complete result from the runner. type Result struct { // Tests is the number of tests ran. Tests int // Failures is the number of tests that failed. Failures int // Output is the raw test output. Output []byte // XMLOutput is the XML Junit output. XMLOutput []byte // JSONOutput is the JSON output. JSONOutput []byte } // State represents the state storage of what has been provisioned. type State struct { // Instances stores provisioned and prepared instances. Instances []StateInstance `yaml:"instances"` // Stacks store provisioned stacks. Stacks []common.Stack `yaml:"stacks"` } // StateInstance is an instance stored in the state. type StateInstance struct { common.Instance // Prepared set to true when the instance is prepared. Prepared bool `yaml:"prepared"` } // Runner runs the tests on remote instances. type Runner struct { cfg common.Config logger common.Logger ip common.InstanceProvisioner sp common.StackProvisioner batches []common.OSBatch batchToStack map[string]stackRes batchToStackCh map[string]chan stackRes batchToStackMx sync.Mutex stateMx sync.Mutex state State } // NewRunner creates a new runner based on the provided batches. func NewRunner(cfg common.Config, ip common.InstanceProvisioner, sp common.StackProvisioner, batches ...define.Batch) (*Runner, error) { err := cfg.Validate() if err != nil { return nil, err } platforms, err := cfg.GetPlatforms() if err != nil { return nil, err } osBatches, err := supported.CreateBatches(batches, platforms, cfg.Groups, cfg.Matrix, cfg.SingleTest) if err != nil { return nil, err } osBatches = filterSupportedOS(osBatches, ip) logger := &runnerLogger{ writer: os.Stdout, timestamp: cfg.Timestamp, } ip.SetLogger(logger) sp.SetLogger(logger) r := &Runner{ cfg: cfg, logger: logger, ip: ip, sp: sp, batches: osBatches, batchToStack: make(map[string]stackRes), batchToStackCh: make(map[string]chan stackRes), } err = r.loadState() if err != nil { return nil, err } return r, nil } // Logger returns the logger used by the runner. func (r *Runner) Logger() common.Logger { return r.logger } // Run runs all the tests. func (r *Runner) Run(ctx context.Context) (Result, error) { // validate tests can even be performed err := r.validate() if err != nil { return Result{}, err } // prepare prepareCtx, prepareCancel := context.WithTimeout(ctx, 10*time.Minute) defer prepareCancel() sshAuth, repoArchive, err := r.prepare(prepareCtx) if err != nil { return Result{}, err } // start the needed stacks err = r.startStacks(ctx) if err != nil { return Result{}, err } // only send to the provisioner the batches that need to be created var instances []StateInstance var batches []common.OSBatch for _, b := range r.batches { if !b.Skip { i, ok := r.findInstance(b.ID) if ok { instances = append(instances, i) } else { batches = append(batches, b) } } } if len(batches) > 0 { provisionedInstances, err := r.ip.Provision(ctx, r.cfg, batches) if err != nil { return Result{}, err } for _, i := range provisionedInstances { instances = append(instances, StateInstance{ Instance: i, Prepared: false, }) } } var results map[string]common.OSRunnerResult switch r.ip.Type() { case common.ProvisionerTypeVM: // use SSH to perform all the required work on the instances results, err = r.runInstances(ctx, sshAuth, repoArchive, instances) if err != nil { return Result{}, err } case common.ProvisionerTypeK8SCluster: results, err = r.runK8sInstances(ctx, instances) if err != nil { return Result{}, err } default: return Result{}, fmt.Errorf("invalid provisioner type %d", r.ip.Type()) } // merge the results return r.mergeResults(results) } // Clean performs a cleanup to ensure anything that could have been left running is removed. func (r *Runner) Clean() error { r.stateMx.Lock() defer r.stateMx.Unlock() var instances []common.Instance for _, i := range r.state.Instances { instances = append(instances, i.Instance) } r.state.Instances = nil stacks := make([]common.Stack, len(r.state.Stacks)) copy(stacks, r.state.Stacks) r.state.Stacks = nil err := r.writeState() if err != nil { return err } var g errgroup.Group g.Go(func() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() return r.ip.Clean(ctx, r.cfg, instances) }) for _, stack := range stacks { g.Go(func(stack common.Stack) func() error { return func() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() return r.sp.Delete(ctx, stack) } }(stack)) } return g.Wait() } func (r *Runner) runK8sInstances(ctx context.Context, instances []StateInstance) (map[string]common.OSRunnerResult, error) { results := make(map[string]common.OSRunnerResult) var resultsMx sync.Mutex var err error for _, instance := range instances { batch, ok := findBatchByID(instance.ID, r.batches) if !ok { err = fmt.Errorf("unable to find batch with ID: %s", instance.ID) continue } logger := &batchLogger{wrapped: r.logger, prefix: instance.ID} // start with the ExtraEnv first preventing the other environment flags below // from being overwritten env := map[string]string{} for k, v := range r.cfg.ExtraEnv { env[k] = v } // ensure that we have all the requirements for the stack if required if batch.Batch.Stack != nil { // wait for the stack to be ready before continuing logger.Logf("Waiting for stack to be ready...") stack, stackErr := r.getStackForBatchID(batch.ID) if stackErr != nil { err = stackErr continue } env["ELASTICSEARCH_HOST"] = stack.Elasticsearch env["ELASTICSEARCH_USERNAME"] = stack.Username env["ELASTICSEARCH_PASSWORD"] = stack.Password env["KIBANA_HOST"] = stack.Kibana env["KIBANA_USERNAME"] = stack.Username env["KIBANA_PASSWORD"] = stack.Password logger.Logf("Using Stack with Kibana host %s, credentials available under .integration-cache", stack.Kibana) } // set the go test flags env["GOTEST_FLAGS"] = r.cfg.TestFlags env["KUBECONFIG"] = instance.Instance.Internal["config"].(string) env["TEST_BINARY_NAME"] = r.cfg.BinaryName env["K8S_VERSION"] = instance.Instance.Internal["version"].(string) env["AGENT_IMAGE"] = instance.Instance.Internal["agent_image"].(string) prefix := fmt.Sprintf("%s-%s", instance.Instance.Internal["version"].(string), batch.ID) // run the actual tests on the host result, runErr := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, nil, logger, r.cfg.AgentVersion, prefix, batch.Batch, env) if runErr != nil { logger.Logf("Failed to execute tests on instance: %s", err) err = fmt.Errorf("failed to execute tests on instance %s: %w", instance.Name, err) } resultsMx.Lock() results[batch.ID] = result resultsMx.Unlock() } if err != nil { return nil, err } return results, nil } // runInstances runs the batch on each instance in parallel. func (r *Runner) runInstances(ctx context.Context, sshAuth ssh.AuthMethod, repoArchive string, instances []StateInstance) (map[string]common.OSRunnerResult, error) { g, ctx := errgroup.WithContext(ctx) results := make(map[string]common.OSRunnerResult) var resultsMx sync.Mutex for _, i := range instances { func(i StateInstance) { g.Go(func() error { batch, ok := findBatchByID(i.ID, r.batches) if !ok { return fmt.Errorf("unable to find batch with ID: %s", i.ID) } logger := &batchLogger{wrapped: r.logger, prefix: i.ID} result, err := r.runInstance(ctx, sshAuth, logger, repoArchive, batch, i) if err != nil { logger.Logf("Failed for instance %s (@ %s): %s\n", i.ID, i.IP, err) return err } resultsMx.Lock() results[batch.ID] = result resultsMx.Unlock() return nil }) }(i) } err := g.Wait() if err != nil { return nil, err } return results, nil } // runInstance runs the batch on the machine. func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger common.Logger, repoArchive string, batch common.OSBatch, instance StateInstance) (common.OSRunnerResult, error) { sshPrivateKeyPath, err := filepath.Abs(filepath.Join(r.cfg.StateDir, "id_rsa")) if err != nil { return common.OSRunnerResult{}, fmt.Errorf("failed to determine OGC SSH private key path: %w", err) } logger.Logf("Starting SSH; connect with `ssh -i %s %s@%s`", sshPrivateKeyPath, instance.Username, instance.IP) client := tssh.NewClient(instance.IP, instance.Username, sshAuth, logger) connectCtx, connectCancel := context.WithTimeout(ctx, 10*time.Minute) defer connectCancel() err = client.Connect(connectCtx) if err != nil { logger.Logf("Failed to connect to instance %s: %s", instance.IP, err) return common.OSRunnerResult{}, fmt.Errorf("failed to connect to instance %s: %w", instance.Name, err) } defer client.Close() logger.Logf("Connected over SSH") if !instance.Prepared { // prepare the host to run the tests logger.Logf("Preparing instance") err = batch.OS.Runner.Prepare(ctx, client, logger, batch.OS.Arch, r.cfg.GOVersion) if err != nil { logger.Logf("Failed to prepare instance: %s", err) return common.OSRunnerResult{}, fmt.Errorf("failed to prepare instance %s: %w", instance.Name, err) } // now its prepared, add to state instance.Prepared = true err = r.addOrUpdateInstance(instance) if err != nil { return common.OSRunnerResult{}, fmt.Errorf("failed to save instance state %s: %w", instance.Name, err) } } // copy the required files (done every run) err = batch.OS.Runner.Copy(ctx, client, logger, repoArchive, r.getBuilds(batch)) if err != nil { logger.Logf("Failed to copy files instance: %s", err) return common.OSRunnerResult{}, fmt.Errorf("failed to copy files to instance %s: %w", instance.Name, err) } // start with the ExtraEnv first preventing the other environment flags below // from being overwritten env := map[string]string{} for k, v := range r.cfg.ExtraEnv { env[k] = v } // ensure that we have all the requirements for the stack if required if batch.Batch.Stack != nil { // wait for the stack to be ready before continuing logger.Logf("Waiting for stack to be ready...") stack, err := r.getStackForBatchID(batch.ID) if err != nil { return common.OSRunnerResult{}, err } env["ELASTICSEARCH_HOST"] = stack.Elasticsearch env["ELASTICSEARCH_USERNAME"] = stack.Username env["ELASTICSEARCH_PASSWORD"] = stack.Password env["KIBANA_HOST"] = stack.Kibana env["KIBANA_USERNAME"] = stack.Username env["KIBANA_PASSWORD"] = stack.Password logger.Logf("Using Stack with Kibana host %s, credentials available under .integration-cache", stack.Kibana) } // set the go test flags env["GOTEST_FLAGS"] = r.cfg.TestFlags env["TEST_BINARY_NAME"] = r.cfg.BinaryName // run the actual tests on the host result, err := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, client, logger, r.cfg.AgentVersion, batch.ID, batch.Batch, env) if err != nil { logger.Logf("Failed to execute tests on instance: %s", err) return common.OSRunnerResult{}, fmt.Errorf("failed to execute tests on instance %s: %w", instance.Name, err) } // fetch any diagnostics if r.cfg.DiagnosticsDir != "" { err = batch.OS.Runner.Diagnostics(ctx, client, logger, r.cfg.DiagnosticsDir) if err != nil { logger.Logf("Failed to fetch diagnostics: %s", err) } } else { logger.Logf("Skipping diagnostics fetch as DiagnosticsDir was not set") } return result, nil } // validate ensures that required builds of Elastic Agent exist func (r *Runner) validate() error { var requiredFiles []string for _, b := range r.batches { if !b.Skip { for _, build := range r.getBuilds(b) { if !slices.Contains(requiredFiles, build.Path) { requiredFiles = append(requiredFiles, build.Path) } if !slices.Contains(requiredFiles, build.SHA512Path) { requiredFiles = append(requiredFiles, build.SHA512Path) } } } } var missingFiles []string for _, file := range requiredFiles { _, err := os.Stat(file) if os.IsNotExist(err) { missingFiles = append(missingFiles, file) } else if err != nil { return err } } if len(missingFiles) > 0 { return fmt.Errorf("missing required Elastic Agent package builds for integration runner to execute: %s", strings.Join(missingFiles, ", ")) } return nil } // getBuilds returns the build for the batch. func (r *Runner) getBuilds(b common.OSBatch) []common.Build { var builds []common.Build formats := []string{"targz", "zip", "rpm", "deb"} binaryName := "elastic-agent" var packages []string for _, p := range r.cfg.Packages { if slices.Contains(formats, p) { packages = append(packages, p) } } if len(packages) == 0 { packages = formats } // This is for testing beats in serverless environment if strings.HasSuffix(r.cfg.BinaryName, "beat") { var serverlessPackages []string for _, p := range packages { if slices.Contains([]string{"targz", "zip"}, p) { serverlessPackages = append(serverlessPackages, p) } } packages = serverlessPackages } if r.cfg.BinaryName != "" { binaryName = r.cfg.BinaryName } for _, f := range packages { arch := b.OS.Arch if arch == define.AMD64 { arch = "x86_64" } suffix, err := testing.GetPackageSuffix(b.OS.Type, b.OS.Arch, f) if err != nil { // Means that OS type & Arch doesn't support that package format continue } packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("%s-%s-%s", binaryName, r.cfg.AgentVersion, suffix)) build := common.Build{ Version: r.cfg.ReleaseVersion, Type: b.OS.Type, Arch: arch, Path: packageName, SHA512Path: packageName + ".sha512", } builds = append(builds, build) } return builds } // prepare prepares for the runner to run. // // Creates the SSH keys to use, creates the archive of the repo and pulls the latest container for OGC. func (r *Runner) prepare(ctx context.Context) (ssh.AuthMethod, string, error) { wd, err := WorkDir() if err != nil { return nil, "", err } cacheDir := filepath.Join(wd, r.cfg.StateDir) _, err = os.Stat(cacheDir) if errors.Is(err, os.ErrNotExist) { err = os.Mkdir(cacheDir, 0755) if err != nil { return nil, "", fmt.Errorf("failed to create %q: %w", cacheDir, err) } } else if err != nil { // unknown error return nil, "", err } var auth ssh.AuthMethod var repoArchive string g, gCtx := errgroup.WithContext(ctx) g.Go(func() error { a, err := r.createSSHKey(cacheDir) if err != nil { return err } auth = a return nil }) g.Go(func() error { repo, err := r.createRepoArchive(gCtx, r.cfg.RepoDir, cacheDir) if err != nil { return err } repoArchive = repo return nil }) err = g.Wait() if err != nil { return nil, "", err } return auth, repoArchive, err } // createSSHKey creates the required SSH keys func (r *Runner) createSSHKey(dir string) (ssh.AuthMethod, error) { privateKey := filepath.Join(dir, "id_rsa") _, priErr := os.Stat(privateKey) publicKey := filepath.Join(dir, "id_rsa.pub") _, pubErr := os.Stat(publicKey) var signer ssh.Signer if errors.Is(priErr, os.ErrNotExist) || errors.Is(pubErr, os.ErrNotExist) { // either is missing (re-create) r.logger.Logf("Create SSH keys to use for SSH") _ = os.Remove(privateKey) _ = os.Remove(publicKey) pri, err := tssh.NewPrivateKey() if err != nil { return nil, fmt.Errorf("failed to create ssh private key: %w", err) } pubBytes, err := tssh.NewPublicKey(&pri.PublicKey) if err != nil { return nil, fmt.Errorf("failed to create ssh public key: %w", err) } priBytes := tssh.EncodeToPEM(pri) err = os.WriteFile(privateKey, priBytes, 0600) if err != nil { return nil, fmt.Errorf("failed to write ssh private key: %w", err) } err = os.WriteFile(publicKey, pubBytes, 0644) if err != nil { return nil, fmt.Errorf("failed to write ssh public key: %w", err) } signer, err = ssh.ParsePrivateKey(priBytes) if err != nil { return nil, fmt.Errorf("failed to parse ssh private key: %w", err) } } else if priErr != nil { // unknown error return nil, priErr } else if pubErr != nil { // unknown error return nil, pubErr } else { // read from existing private key priBytes, err := os.ReadFile(privateKey) if err != nil { return nil, fmt.Errorf("failed to read ssh private key %s: %w", privateKey, err) } signer, err = ssh.ParsePrivateKey(priBytes) if err != nil { return nil, fmt.Errorf("failed to parse ssh private key: %w", err) } } return ssh.PublicKeys(signer), nil } func (r *Runner) createRepoArchive(ctx context.Context, repoDir string, dir string) (string, error) { zipPath := filepath.Join(dir, "agent-repo.zip") _ = os.Remove(zipPath) // start fresh r.logger.Logf("Creating zip archive of repo to send to remote hosts") err := createRepoZipArchive(ctx, repoDir, zipPath) if err != nil { return "", fmt.Errorf("failed to create zip archive of repo: %w", err) } return zipPath, nil } // startStacks starts the stacks required for the tests to run func (r *Runner) startStacks(ctx context.Context) error { var versions []string batchToVersion := make(map[string]string) for _, lb := range r.batches { if !lb.Skip && lb.Batch.Stack != nil { if lb.Batch.Stack.Version == "" { // no version defined on the stack; set it to the defined stack version lb.Batch.Stack.Version = r.cfg.StackVersion } if !slices.Contains(versions, lb.Batch.Stack.Version) { versions = append(versions, lb.Batch.Stack.Version) } batchToVersion[lb.ID] = lb.Batch.Stack.Version } } var requests []stackReq for _, version := range versions { id := strings.Replace(version, ".", "", -1) requests = append(requests, stackReq{ request: common.StackRequest{ID: id, Version: version}, stack: r.findStack(id), }) } reportResult := func(version string, stack common.Stack, err error) { r.batchToStackMx.Lock() defer r.batchToStackMx.Unlock() res := stackRes{ stack: stack, err: err, } for batchID, batchVersion := range batchToVersion { if batchVersion == version { r.batchToStack[batchID] = res ch, ok := r.batchToStackCh[batchID] if ok { ch <- res } } } } // start goroutines to provision the needed stacks for _, request := range requests { go func(ctx context.Context, req stackReq) { var err error var stack common.Stack if req.stack != nil { stack = *req.stack } else { stack, err = r.sp.Create(ctx, req.request) if err != nil { reportResult(req.request.Version, stack, err) return } err = r.addOrUpdateStack(stack) if err != nil { reportResult(stack.Version, stack, err) return } } if stack.Ready { reportResult(stack.Version, stack, nil) return } stack, err = r.sp.WaitForReady(ctx, stack) if err != nil { reportResult(stack.Version, stack, err) return } err = r.addOrUpdateStack(stack) if err != nil { reportResult(stack.Version, stack, err) return } reportResult(stack.Version, stack, nil) }(ctx, request) } return nil } func (r *Runner) getStackForBatchID(id string) (common.Stack, error) { r.batchToStackMx.Lock() res, ok := r.batchToStack[id] if ok { r.batchToStackMx.Unlock() return res.stack, res.err } _, ok = r.batchToStackCh[id] if ok { return common.Stack{}, fmt.Errorf("getStackForBatchID called twice; this is not allowed") } ch := make(chan stackRes, 1) r.batchToStackCh[id] = ch r.batchToStackMx.Unlock() // 12 minutes is because the stack should have been ready after 10 minutes or returned an error // this only exists to ensure that if that code is not blocking that this doesn't block forever t := time.NewTimer(12 * time.Minute) defer t.Stop() select { case <-t.C: return common.Stack{}, fmt.Errorf("failed waiting for a response after 12 minutes") case res = <-ch: return res.stack, res.err } } func (r *Runner) findInstance(id string) (StateInstance, bool) { r.stateMx.Lock() defer r.stateMx.Unlock() for _, existing := range r.state.Instances { if existing.Same(StateInstance{ Instance: common.Instance{ID: id, Provisioner: r.ip.Name()}}) { return existing, true } } return StateInstance{}, false } func (r *Runner) addOrUpdateInstance(instance StateInstance) error { r.stateMx.Lock() defer r.stateMx.Unlock() state := r.state found := false for idx, existing := range state.Instances { if existing.Same(instance) { state.Instances[idx] = instance found = true break } } if !found { state.Instances = append(state.Instances, instance) } r.state = state return r.writeState() } func (r *Runner) findStack(id string) *common.Stack { r.stateMx.Lock() defer r.stateMx.Unlock() for _, existing := range r.state.Stacks { if existing.Same(common.Stack{ID: id, Provisioner: r.sp.Name()}) { return &existing } } return nil } func (r *Runner) addOrUpdateStack(stack common.Stack) error { r.stateMx.Lock() defer r.stateMx.Unlock() state := r.state found := false for idx, existing := range state.Stacks { if existing.Same(stack) { state.Stacks[idx] = stack found = true break } } if !found { state.Stacks = append(state.Stacks, stack) } r.state = state return r.writeState() } func (r *Runner) loadState() error { data, err := os.ReadFile(r.getStatePath()) if err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("failed to read state file %s: %w", r.getStatePath(), err) } var state State err = yaml.Unmarshal(data, &state) if err != nil { return fmt.Errorf("failed unmarshal state file %s: %w", r.getStatePath(), err) } r.state = state return nil } func (r *Runner) writeState() error { data, err := yaml.Marshal(&r.state) if err != nil { return fmt.Errorf("failed to marshal state: %w", err) } err = os.WriteFile(r.getStatePath(), data, 0644) if err != nil { return fmt.Errorf("failed to write state file %s: %w", r.getStatePath(), err) } return nil } func (r *Runner) getStatePath() string { return filepath.Join(r.cfg.StateDir, "state.yml") } func (r *Runner) mergeResults(results map[string]common.OSRunnerResult) (Result, error) { var rawOutput bytes.Buffer var jsonOutput bytes.Buffer var suites JUnitTestSuites for id, res := range results { for _, pkg := range res.Packages { err := mergePackageResult(pkg, id, false, &rawOutput, &jsonOutput, &suites) if err != nil { return Result{}, err } } for _, pkg := range res.SudoPackages { err := mergePackageResult(pkg, id, true, &rawOutput, &jsonOutput, &suites) if err != nil { return Result{}, err } } } var junitBytes bytes.Buffer err := writeJUnit(&junitBytes, suites) if err != nil { return Result{}, fmt.Errorf("failed to marshal junit: %w", err) } var complete Result for _, suite := range suites.Suites { complete.Tests += suite.Tests complete.Failures += suite.Failures } complete.Output = rawOutput.Bytes() complete.JSONOutput = jsonOutput.Bytes() complete.XMLOutput = junitBytes.Bytes() return complete, nil } // Same returns true if other is the same instance as this one. // Two instances are considered the same if their provider and ID are the same. func (s StateInstance) Same(other StateInstance) bool { return s.Provisioner == other.Provisioner && s.ID == other.ID } func mergePackageResult(pkg common.OSRunnerPackageResult, batchName string, sudo bool, rawOutput io.Writer, jsonOutput io.Writer, suites *JUnitTestSuites) error { suffix := "" sudoStr := "false" if sudo { suffix = "(sudo)" sudoStr = "true" } if pkg.Output != nil { rawLogger := &runnerLogger{writer: rawOutput, timestamp: false} pkgWriter := common.NewPrefixOutput(rawLogger, fmt.Sprintf("%s(%s)%s: ", pkg.Name, batchName, suffix)) _, err := pkgWriter.Write(pkg.Output) if err != nil { return fmt.Errorf("failed to write raw output from %s %s: %w", batchName, pkg.Name, err) } } if pkg.JSONOutput != nil { jsonSuffix, err := suffixJSONResults(pkg.JSONOutput, fmt.Sprintf("(%s)%s", batchName, suffix)) if err != nil { return fmt.Errorf("failed to suffix json output from %s %s: %w", batchName, pkg.Name, err) } _, err = jsonOutput.Write(jsonSuffix) if err != nil { return fmt.Errorf("failed to write json output from %s %s: %w", batchName, pkg.Name, err) } } if pkg.XMLOutput != nil { pkgSuites, err := parseJUnit(pkg.XMLOutput) if err != nil { return fmt.Errorf("failed to parse junit from %s %s: %w", batchName, pkg.Name, err) } for _, pkgSuite := range pkgSuites.Suites { // append the batch information to the suite name pkgSuite.Name = fmt.Sprintf("%s(%s)%s", pkgSuite.Name, batchName, suffix) pkgSuite.Properties = append(pkgSuite.Properties, JUnitProperty{ Name: "batch", Value: batchName, }, JUnitProperty{ Name: "sudo", Value: sudoStr, }) suites.Suites = append(suites.Suites, pkgSuite) } } return nil } func findBatchByID(id string, batches []common.OSBatch) (common.OSBatch, bool) { for _, batch := range batches { if batch.ID == id { return batch, true } } return common.OSBatch{}, false } type runnerLogger struct { writer io.Writer timestamp bool } func (l *runnerLogger) Logf(format string, args ...any) { if l.timestamp { _, _ = fmt.Fprintf(l.writer, "[%s] >>> %s\n", time.Now().Format(time.StampMilli), fmt.Sprintf(format, args...)) } else { _, _ = fmt.Fprintf(l.writer, ">>> %s\n", fmt.Sprintf(format, args...)) } } type batchLogger struct { wrapped common.Logger prefix string } func filterSupportedOS(batches []common.OSBatch, provisioner common.InstanceProvisioner) []common.OSBatch { var filtered []common.OSBatch for _, batch := range batches { if ok := provisioner.Supported(batch.OS.OS); ok { filtered = append(filtered, batch) } } return filtered } func (b *batchLogger) Logf(format string, args ...any) { b.wrapped.Logf("(%s) %s", b.prefix, fmt.Sprintf(format, args...)) } type stackRes struct { stack common.Stack err error } type stackReq struct { request common.StackRequest stack *common.Stack }