systemtest/containers.go (360 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package systemtest
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/go-connections/nat"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
const (
startContainersTimeout = 5 * time.Minute
ElasticAgentImage = "docker.elastic.co/beats/elastic-agent"
)
var (
systemtestDir string
)
func initContainers() {
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("could not locate systemtest directory")
}
systemtestDir = filepath.Dir(filename)
}
// StartStackContainers starts Docker containers for Elasticsearch and Kibana.
//
// We leave Elasticsearch and Kibana running, to avoid slowing down iterative
// development and testing. Use docker-compose to stop services as necessary.
func StartStackContainers() error {
cmd := exec.Command(
"docker", "compose", "-f", "../docker-compose.yml",
"up", "-d", "elasticsearch", "kibana",
)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return err
}
// Wait for up to 5 minutes for Kibana to become healthy,
// which implies Elasticsearch is healthy too.
ctx, cancel := context.WithTimeout(context.Background(), startContainersTimeout)
defer cancel()
return waitContainerHealthy(ctx, "kibana")
}
func waitContainerHealthy(ctx context.Context, serviceName string) error {
docker, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return err
}
defer docker.Close()
docker.NegotiateAPIVersion(ctx)
container, err := stackContainerInfo(ctx, docker, serviceName)
if err != nil {
return err
}
t := time.NewTicker(1500 * time.Millisecond)
defer t.Stop()
first := true
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
containerJSON, err := docker.ContainerInspect(ctx, container.ID)
if err != nil {
return err
}
if containerJSON.State.Health.Status == "healthy" {
log.Printf("Container %s Healthy", serviceName)
return nil
}
if first {
log.Printf("Waiting for %s container (%s) to become healthy", serviceName, container.ID)
first = false
}
}
}
}
func stackContainerInfo(ctx context.Context, docker *client.Client, name string) (*types.Container, error) {
containers, err := docker.ContainerList(ctx, container.ListOptions{
Filters: filters.NewArgs(
filters.Arg("label", "com.docker.compose.project=apm-server"),
filters.Arg("label", "com.docker.compose.service="+name),
),
})
if err != nil {
return nil, err
}
if n := len(containers); n != 1 {
return nil, fmt.Errorf("expected 1 %s container, got %d", name, n)
}
return &containers[0], nil
}
type ContainerConfig struct {
Name string
Arch string
// BaseImage, if non-empty, specifies the elastic-agent
// image to use. If BaseImage is empty, ElasticAgentImage
// will be used.
BaseImage string
// BaseImageVersion, if non-empty, specifies the elastic-agent
// image tag to use. If BaseImageVersion is empty, the image
// tag used by the fleet-server docker-compose service will be
// used.
BaseImageVersion string
}
// NewUnstartedElasticAgentContainer returns a new ElasticAgentContainer.
func NewUnstartedElasticAgentContainer(opts ContainerConfig) (*ElasticAgentContainer, error) {
// Create a testcontainer.ContainerRequest to run Elastic Agent.
// We pull some configuration from the Kibana docker-compose service,
// such as the Docker network to use.
if opts.Arch == "" {
opts.Arch = runtime.GOARCH
}
if opts.BaseImage == "" {
opts.BaseImage = ElasticAgentImage
}
docker, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, err
}
defer docker.Close()
docker.NegotiateAPIVersion(context.Background())
var networks []string
if opts.BaseImageVersion == "" {
fleetServerContainer, err := stackContainerInfo(context.Background(), docker, "fleet-server")
if err != nil {
return nil, err
}
fleetServerContainerDetails, err := docker.ContainerInspect(context.Background(), fleetServerContainer.ID)
if err != nil {
return nil, err
}
for network := range fleetServerContainerDetails.NetworkSettings.Networks {
networks = append(networks, network)
}
// Use the same elastic-agent image as used for fleet-server.
opts.BaseImageVersion = fleetServerContainer.Image[strings.LastIndex(fleetServerContainer.Image, ":")+1:]
}
// Build a custom elastic-agent image with a locally built apm-server binary injected.
agentImage := fmt.Sprintf("elastic-agent-systemtest:%s", opts.BaseImageVersion)
if err := BuildElasticAgentImage(context.Background(),
docker, opts.Arch,
fmt.Sprintf("%s:%s", opts.BaseImage, opts.BaseImageVersion),
agentImage,
); err != nil {
return nil, err
}
agentImageDetails, _, err := docker.ImageInspectWithRaw(context.Background(), agentImage)
if err != nil {
return nil, err
}
vcsRef := agentImageDetails.Config.Labels["org.label-schema.vcs-ref"]
req := testcontainers.ContainerRequest{
Name: opts.Name,
Image: agentImage,
AutoRemove: true,
Networks: networks,
SkipReaper: true, // we use our own reaping logic
}
return &ElasticAgentContainer{
vcsRef: vcsRef,
request: req,
exited: make(chan struct{}),
Reap: true,
}, nil
}
// ElasticAgentContainer represents an ephemeral Elastic Agent container.
type ElasticAgentContainer struct {
vcsRef string
container testcontainers.Container
request testcontainers.ContainerRequest
exited chan struct{}
// Reap entrols whether the container will be automatically reaped if
// the controlling process exits. This is true by default, and may be
// set to false before the container is started to prevent the container
// from being stoped and removed.
Reap bool
// ExposedPorts holds an optional list of ports to expose to the host.
ExposedPorts []string
// WaitingFor holds an optional wait strategy.
WaitingFor wait.Strategy
// Addrs holds the "host:port" address for each exposed port, mapped
// by exposed port. This will be populated by Start.
Addrs map[string]string
// FleetEnrollmentToken holds an optional Fleet enrollment token to
// use for enrolling the agent with Fleet. The agent will only enroll
// if this is specified.
FleetEnrollmentToken string
// Stdout, if non-nil, holds a writer to which the container's stdout
// will be written.
Stdout io.Writer
// Stderr, if non-nil, holds a writer to which the container's stderr
// will be written.
Stderr io.Writer
}
// Start starts the container.
//
// The Addr and Client fields will be updated on successful return.
//
// The container will be removed when Close() is called, or otherwise by a
// reaper process if the test process is aborted.
func (c *ElasticAgentContainer) Start() error {
ctx, cancel := context.WithTimeout(context.Background(), startContainersTimeout)
defer cancel()
// Update request from user-definable fields.
if c.FleetEnrollmentToken != "" {
c.request.Env["FLEET_ENROLL"] = "1"
c.request.Env["FLEET_ENROLLMENT_TOKEN"] = c.FleetEnrollmentToken
}
c.request.ExposedPorts = c.ExposedPorts
c.request.WaitingFor = c.WaitingFor
c.request.SkipReaper = !c.Reap
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: c.request,
})
if err != nil {
return err
}
c.container = container
// Start a goroutine to read logs, and signal when the container process has exited.
if c.Stdout != nil || c.Stderr != nil {
go func() {
defer close(c.exited)
defer cancel()
stdout, stderr := c.Stdout, c.Stderr
if stdout == nil {
stdout = io.Discard
}
if stderr == nil {
stderr = io.Discard
}
_ = c.copyLogs(stdout, stderr)
}()
}
if err := container.Start(ctx); err != nil {
if err != context.Canceled {
return fmt.Errorf("failed to start container: %w", err)
}
return errors.New("failed to start container")
}
if len(c.request.ExposedPorts) > 0 {
hostIP, err := container.Host(ctx)
if err != nil {
return err
}
c.Addrs = make(map[string]string)
for _, exposedPort := range c.request.ExposedPorts {
mappedPort, err := container.MappedPort(ctx, nat.Port(exposedPort))
if err != nil {
return err
}
c.Addrs[exposedPort] = net.JoinHostPort(hostIP, mappedPort.Port())
}
}
c.container = container
return nil
}
func (c *ElasticAgentContainer) copyLogs(stdout, stderr io.Writer) error {
// Wait for the container to be running (or have gone past that),
// or ContainerLogs will return immediately.
ctx := context.Background()
for {
state, err := c.container.State(ctx)
if err != nil {
return err
}
if state.Status != "created" {
break
}
}
docker, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return err
}
defer docker.Close()
docker.NegotiateAPIVersion(ctx)
options := container.LogsOptions{
ShowStdout: stdout != nil,
ShowStderr: stderr != nil,
Follow: true,
}
rc, err := docker.ContainerLogs(ctx, c.container.GetContainerID(), options)
if err != nil {
return err
}
defer rc.Close()
_, err = stdcopy.StdCopy(stdout, stderr, rc)
return err
}
// Close terminates and removes the container.
func (c *ElasticAgentContainer) Close() error {
if c.container == nil {
return nil
}
return c.container.Terminate(context.Background())
}
// APMServerLog returns the contents of the APM Sever log file.
func (c *ElasticAgentContainer) APMServerLog() (io.ReadCloser, error) {
return c.container.CopyFileFromContainer(
context.Background(), fmt.Sprintf(
"/usr/share/elastic-agent/data/elastic-agent-%s/components/logs/apm-server-%s.ndjson",
c.vcsRef[:6],
time.Now().UTC().Format("20060102"),
),
)
}
// Wait waits for the container process to exit, and returns its state.
func (c *ElasticAgentContainer) Wait(ctx context.Context) (*types.ContainerState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-c.exited:
return c.container.State(ctx)
}
}
// Exec executes a command in the container, and returns its stdout and stderr.
func (c *ElasticAgentContainer) Exec(ctx context.Context, cmd ...string) (stdout, stderr []byte, _ error) {
docker, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, nil, err
}
defer docker.Close()
docker.NegotiateAPIVersion(ctx)
response, err := docker.ContainerExecCreate(ctx, c.container.GetContainerID(), container.ExecOptions{
AttachStderr: true,
AttachStdout: true,
Cmd: cmd,
})
if err != nil {
return nil, nil, err
}
// Consume all the exec output.
resp, err := docker.ContainerExecAttach(ctx, response.ID, container.ExecStartOptions{})
if err != nil {
return nil, nil, err
}
defer resp.Close()
var stdoutBuf, stderrBuf bytes.Buffer
if _, err := stdcopy.StdCopy(&stdoutBuf, &stderrBuf, resp.Reader); err != nil {
return nil, nil, err
}
// Return an error if the command exited non-zero.
execResp, err := docker.ContainerExecInspect(ctx, response.ID)
if err != nil {
return nil, nil, err
}
if execResp.ExitCode != 0 {
return nil, nil, fmt.Errorf("process exited with code %d", execResp.ExitCode)
}
return stdoutBuf.Bytes(), stderrBuf.Bytes(), nil
}
func matchFleetServerAPIStatusHealthy(r io.Reader) bool {
var status struct {
Name string `json:"name"`
Version string `json:"version"`
Status string `json:"status"`
}
if err := json.NewDecoder(r).Decode(&status); err != nil {
return false
}
return status.Status == "HEALTHY"
}
// BuildElasticAgentImage builds a Docker image from the published image with a locally built apm-server injected.
func BuildElasticAgentImage(
ctx context.Context,
docker *client.Client,
arch string,
baseImage, outputImageName string,
) error {
agentImageMu.Lock()
defer agentImageMu.Unlock()
if agentImages[arch] {
return nil
}
log.Printf("Building image %s (%s) from %s...", outputImageName, arch, baseImage)
cmd := exec.Command(
"bash",
filepath.Join(systemtestDir, "..", "testing", "docker", "elastic-agent", "build.sh"),
"-t", outputImageName,
)
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, "BASE_IMAGE="+baseImage, "GOARCH="+arch)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return err
}
log.Printf("Built image %s (%s)", outputImageName, arch)
agentImages[arch] = true
return nil
}
var (
agentImageMu sync.RWMutex
agentImages = make(map[string]bool)
)