pkg/testing/ogc/provisioner.go (287 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package ogc
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"gopkg.in/yaml.v2"
"github.com/elastic/elastic-agent/pkg/core/process"
"github.com/elastic/elastic-agent/pkg/testing/common"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/runner"
)
const (
// LayoutIntegrationTag is the tag added to all layouts for the integration testing framework.
LayoutIntegrationTag = "agent-integration"
Name = "ogc"
)
type provisioner struct {
logger common.Logger
cfg Config
}
// NewProvisioner creates the OGC provisioner
func NewProvisioner(cfg Config) (common.InstanceProvisioner, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
return &provisioner{
cfg: cfg,
}, nil
}
func (p *provisioner) Name() string {
return Name
}
func (p *provisioner) SetLogger(l common.Logger) {
p.logger = l
}
func (p *provisioner) Type() common.ProvisionerType {
return common.ProvisionerTypeVM
}
// Supported returns true when we support this OS for OGC.
func (p *provisioner) Supported(os define.OS) bool {
_, ok := findOSLayout(os)
return ok
}
func (p *provisioner) Provision(ctx context.Context, cfg common.Config, batches []common.OSBatch) ([]common.Instance, error) {
// ensure the latest version
pullCtx, pullCancel := context.WithTimeout(ctx, 5*time.Minute)
defer pullCancel()
err := p.ogcPull(pullCtx)
if err != nil {
return nil, err
}
// import the calculated layouts
importCtx, importCancel := context.WithTimeout(ctx, 30*time.Second)
defer importCancel()
err = p.ogcImport(importCtx, cfg, batches)
if err != nil {
return nil, err
}
// bring up all the instances
upCtx, upCancel := context.WithTimeout(ctx, 30*time.Minute)
defer upCancel()
upOutput, err := p.ogcUp(upCtx)
if err != nil {
return nil, fmt.Errorf("ogc up failed: %w", err)
}
// fetch the machines and run the batches on the machine
machines, err := p.ogcMachines(ctx)
if err != nil {
return nil, err
}
if len(machines) == 0 {
// Print the output so its clear what went wrong.
// Without this it's unclear where OGC went wrong, it
// doesn't do a great job of reporting a clean error
fmt.Fprintf(os.Stdout, "%s\n", upOutput)
return nil, fmt.Errorf("ogc didn't create any machines")
}
// map the machines to instances
var instances []common.Instance
for _, b := range batches {
machine, ok := findMachine(machines, b.ID)
if !ok {
// print the output so its clear what went wrong.
// Without this it's unclear where OGC went wrong, it
// doesn't do a great job of reporting a clean error
fmt.Fprintf(os.Stdout, "%s\n", upOutput)
return nil, fmt.Errorf("failed to find machine for batch ID: %s", b.ID)
}
instances = append(instances, common.Instance{
ID: b.ID,
Provisioner: Name,
Name: machine.InstanceName,
IP: machine.PublicIP,
Username: machine.Layout.Username,
RemotePath: machine.Layout.RemotePath,
Internal: map[string]interface{}{
"instance_id": machine.InstanceID,
},
})
}
return instances, nil
}
// Clean cleans up all provisioned resources.
func (p *provisioner) Clean(ctx context.Context, cfg common.Config, _ []common.Instance) error {
return p.ogcDown(ctx)
}
// ogcPull pulls the latest ogc version.
func (p *provisioner) ogcPull(ctx context.Context) error {
args := []string{
"pull",
"docker.elastic.co/observability-ci/ogc:5.0.1",
}
var output bytes.Buffer
p.logger.Logf("Pulling latest ogc image")
proc, err := process.Start("docker", process.WithContext(ctx), process.WithArgs(args), process.WithCmdOptions(runner.AttachOut(&output), runner.AttachErr(&output)))
if err != nil {
return fmt.Errorf("failed to run docker ogcPull: %w", err)
}
ps := <-proc.Wait()
if ps.ExitCode() != 0 {
// print the output so its clear what went wrong
fmt.Fprintf(os.Stdout, "%s\n", output.Bytes())
return fmt.Errorf("failed to run ogc pull: docker run exited with code: %d", ps.ExitCode())
}
return nil
}
// ogcImport imports all the required batches into OGC.
func (p *provisioner) ogcImport(ctx context.Context, cfg common.Config, batches []common.OSBatch) error {
var layouts []Layout
for _, ob := range batches {
layouts = append(layouts, osBatchToOGC(cfg.StateDir, ob))
}
layoutData, err := yaml.Marshal(struct {
Layouts []Layout `yaml:"layouts"`
}{
Layouts: layouts,
})
if err != nil {
return fmt.Errorf("failed to marshal layouts YAML: %w", err)
}
var output bytes.Buffer
p.logger.Logf("Import layouts into ogc")
proc, err := p.ogcRun(ctx, []string{"layout", "import"}, true, process.WithCmdOptions(runner.AttachOut(&output), runner.AttachErr(&output)))
if err != nil {
return fmt.Errorf("failed to run ogc import: %w", err)
}
_, err = proc.Stdin.Write(layoutData)
if err != nil {
_ = proc.Stdin.Close()
_ = proc.Kill()
<-proc.Wait()
// print the output so its clear what went wrong
fmt.Fprintf(os.Stdout, "%s\n", output.Bytes())
return fmt.Errorf("failed to write layouts to stdin: %w", err)
}
_ = proc.Stdin.Close()
ps := <-proc.Wait()
if ps.ExitCode() != 0 {
// print the output so its clear what went wrong
fmt.Fprintf(os.Stdout, "%s\n", output.Bytes())
return fmt.Errorf("failed to run ogc import: docker run exited with code: %d", ps.ExitCode())
}
return nil
}
// ogcUp brings up all the instances.
func (p *provisioner) ogcUp(ctx context.Context) ([]byte, error) {
p.logger.Logf("Bring up instances through ogc")
var output bytes.Buffer
proc, err := p.ogcRun(ctx, []string{"up", LayoutIntegrationTag}, false, process.WithCmdOptions(runner.AttachOut(&output), runner.AttachErr(&output)))
if err != nil {
return nil, fmt.Errorf("failed to run ogc up: %w", err)
}
ps := <-proc.Wait()
if ps.ExitCode() != 0 {
// print the output so its clear what went wrong
fmt.Fprintf(os.Stdout, "%s\n", output.Bytes())
return nil, fmt.Errorf("failed to run ogc up: docker run exited with code: %d", ps.ExitCode())
}
return output.Bytes(), nil
}
// ogcDown brings down all the instances.
func (p *provisioner) ogcDown(ctx context.Context) error {
p.logger.Logf("Bring down instances through ogc")
var output bytes.Buffer
proc, err := p.ogcRun(ctx, []string{"down", LayoutIntegrationTag}, false, process.WithCmdOptions(runner.AttachOut(&output), runner.AttachErr(&output)))
if err != nil {
return fmt.Errorf("failed to run ogc down: %w", err)
}
ps := <-proc.Wait()
if ps.ExitCode() != 0 {
// print the output so its clear what went wrong
fmt.Fprintf(os.Stdout, "%s\n", output.Bytes())
return fmt.Errorf("failed to run ogc down: docker run exited with code: %d", ps.ExitCode())
}
return nil
}
// ogcMachines lists all the instances.
func (p *provisioner) ogcMachines(ctx context.Context) ([]Machine, error) {
var out bytes.Buffer
proc, err := p.ogcRun(ctx, []string{"ls", "--as-yaml"}, false, process.WithCmdOptions(runner.AttachOut(&out)))
if err != nil {
return nil, fmt.Errorf("failed to run ogc ls: %w", err)
}
ps := <-proc.Wait()
if ps.ExitCode() != 0 {
return nil, fmt.Errorf("failed to run ogc ls: docker run exited with code: %d", ps.ExitCode())
}
var machines []Machine
err = yaml.Unmarshal(out.Bytes(), &machines)
if err != nil {
return nil, fmt.Errorf("failed to parse ogc ls output: %w", err)
}
return machines, nil
}
func (p *provisioner) ogcRun(ctx context.Context, args []string, interactive bool, processOpts ...process.StartOption) (*process.Info, error) {
wd, err := runner.WorkDir()
if err != nil {
return nil, err
}
tokenName := filepath.Base(p.cfg.ServiceTokenPath)
clientEmail, err := p.cfg.ClientEmail()
if err != nil {
return nil, err
}
projectID, err := p.cfg.ProjectID()
if err != nil {
return nil, err
}
runArgs := []string{"run"}
if interactive {
runArgs = append(runArgs, "-i")
}
runArgs = append(runArgs,
"--rm",
"-e",
fmt.Sprintf("GOOGLE_APPLICATION_SERVICE_ACCOUNT=%s", clientEmail),
"-e",
fmt.Sprintf("GOOGLE_APPLICATION_CREDENTIALS=/root/%s", tokenName),
"-e",
fmt.Sprintf("GOOGLE_PROJECT=%s", projectID),
"-e",
fmt.Sprintf("GOOGLE_DATACENTER=%s", p.cfg.Datacenter),
"-v",
fmt.Sprintf("%s:/root/%s", p.cfg.ServiceTokenPath, tokenName),
"-v",
fmt.Sprintf("%s:%s", wd, wd),
"-w",
wd,
"docker.elastic.co/observability-ci/ogc:5.0.1",
"--",
"ogc",
"-v",
)
runArgs = append(runArgs, args...)
opts := []process.StartOption{process.WithContext(ctx), process.WithArgs(runArgs)}
opts = append(opts, processOpts...)
return process.Start("docker", opts...)
}
func osBatchToOGC(cacheDir string, batch common.OSBatch) Layout {
tags := []string{
LayoutIntegrationTag,
batch.OS.Type,
batch.OS.Arch,
}
if batch.OS.Type == define.Linux {
tags = append(tags, strings.ToLower(fmt.Sprintf("%s-%s", batch.OS.Distro, strings.Replace(batch.OS.Version, ".", "-", -1))))
} else {
tags = append(tags, strings.ToLower(fmt.Sprintf("%s-%s", batch.OS.Type, strings.Replace(batch.OS.Version, ".", "-", -1))))
}
los, _ := findOSLayout(batch.OS.OS)
return Layout{
Name: batch.ID,
Provider: los.Provider,
InstanceSize: los.InstanceSize,
RunsOn: los.RunsOn,
RemotePath: los.RemotePath,
Scale: 1,
Username: los.Username,
SSHPrivateKey: cacheDir + "/id_rsa",
SSHPublicKey: cacheDir + "/id_rsa.pub",
Ports: []string{"22:22"},
Tags: tags,
Labels: map[string]string{
"division": "engineering",
"org": "ingest",
"team": "elastic-agent-control-plane",
"project": "elastic-agent",
},
Scripts: "path", // not used; but required by OGC
}
}
func findOSLayout(os define.OS) (LayoutOS, bool) {
for _, s := range ogcSupported {
if s.OS == os {
return s, true
}
}
return LayoutOS{}, false
}
func findMachine(machines []Machine, name string) (Machine, bool) {
for _, m := range machines {
if m.Layout.Name == name {
return m, true
}
}
return Machine{}, false
}