internal/compose/compose.go (397 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package compose
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
"github.com/Masterminds/semver/v3"
"gopkg.in/yaml.v3"
"github.com/elastic/elastic-package/internal/docker"
"github.com/elastic/elastic-package/internal/environment"
"github.com/elastic/elastic-package/internal/logger"
)
const (
// waitForHealthyTimeout is the maximum duration for WaitForHealthy().
waitForHealthyTimeout = 10 * time.Minute
// waitForHealthyInterval is the check interval for WaitForHealthy().
waitForHealthyInterval = 1 * time.Second
)
var (
EnableComposeStandaloneEnv = environment.WithElasticPackagePrefix("COMPOSE_ENABLE_STANDALONE")
DisableVerboseOutputComposeEnv = environment.WithElasticPackagePrefix("COMPOSE_DISABLE_VERBOSE_OUTPUT")
)
const (
defaultComposeProgressOutput = "plain"
)
// Project represents a Docker Compose project.
type Project struct {
name string
composeFilePaths []string
dockerComposeStandalone bool
disableANSI bool
disablePullProgressInformation bool
progressOutput string
composeVersion *semver.Version
}
// Config represents a Docker Compose configuration file.
type Config struct {
Services map[string]service
}
type service struct {
Ports []portMapping
Environment map[string]string
}
type portMapping struct {
ExternalIP string
ExternalPort int
InternalPort int
Protocol string
}
type intOrStringYaml int
func (p *intOrStringYaml) UnmarshalYAML(node *yaml.Node) error {
var s string
err := node.Decode(&s)
if err == nil {
i, err := strconv.Atoi(s)
*p = intOrStringYaml(i)
return err
}
return node.Decode(p)
}
// UnmarshalYAML unmarshals a Docker Compose port mapping in YAML to
// a portMapping.
func (p *portMapping) UnmarshalYAML(node *yaml.Node) error {
// Depending on how the port mapping is specified in the Docker Compose
// configuration file, sometimes a map is returned and other times a
// string is returned. Here we first check if a map was returned.
if node.Kind == yaml.MappingNode {
b, err := yaml.Marshal(node)
if err != nil {
return fmt.Errorf("could not re-encode YAML map node to YAML: %w", err)
}
var s struct {
HostIP string `yaml:"host_ip"`
Target intOrStringYaml // Docker compose v2 can define ports as strings.
Published intOrStringYaml // Docker compose v2 can define ports as strings.
Protocol string
}
if err := yaml.Unmarshal(b, &s); err != nil {
return fmt.Errorf("could not unmarshal YAML map node: %w", err)
}
p.InternalPort = int(s.Target)
p.ExternalPort = int(s.Published)
p.Protocol = s.Protocol
p.ExternalIP = s.HostIP
return nil
}
var str string
if err := node.Decode(&str); err != nil {
return err
}
// First, parse out the protocol.
mapping, protocol, found := strings.Cut(str, "/")
if !found {
return errors.New("could not find protocol in port mapping")
}
p.Protocol = protocol
// Now, try to parse out external host, external IP, and internal port.
parts := strings.Split(mapping, ":")
var externalIP, internalPortStr, externalPortStr string
switch len(parts) {
case 1:
// All we have is an internal port.
internalPortStr = parts[0]
case 3:
// We have an external IP, external port, and an internal port.
externalIP = parts[0]
externalPortStr = parts[1]
internalPortStr = parts[2]
default:
return errors.New("could not parse port mapping")
}
internalPort, err := strconv.Atoi(internalPortStr)
if err != nil {
return fmt.Errorf("error parsing internal port as integer: %w", err)
}
p.InternalPort = internalPort
if externalPortStr != "" {
externalPort, err := strconv.Atoi(externalPortStr)
if err != nil {
return fmt.Errorf("error parsing external port as integer: %w", err)
}
p.ExternalPort = externalPort
}
p.ExternalIP = externalIP
return nil
}
// CommandOptions encapsulates the environment variables, extra arguments, and Docker Compose services
// that can be passed to each Docker Compose command.
type CommandOptions struct {
Env []string
ExtraArgs []string
Services []string
}
// NewProject creates a new Docker Compose project given a sequence of Docker Compose configuration files.
func NewProject(name string, paths ...string) (*Project, error) {
// TODO: a lot of the checks in NewProject don't need to happen any more, we might want to rethink how we do this.
for _, path := range paths {
info, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("could not find Docker Compose configuration file: %s: %w", path, err)
}
if info.IsDir() {
return nil, fmt.Errorf("expected Docker Compose configuration file (%s) to be a file, not a folder", path)
}
}
var c Project
c.name = name
c.composeFilePaths = paths
v, ok := os.LookupEnv(EnableComposeStandaloneEnv)
if ok && strings.ToLower(v) != "false" {
c.dockerComposeStandalone = true
} else {
c.dockerComposeStandalone = c.dockerComposeStandaloneRequired()
}
// Passing a nil context here because we are on initialization.
ver, err := c.dockerComposeVersion(context.Background())
if err != nil {
return nil, fmt.Errorf("unable to determine Docker Compose version: %w", err)
}
if ver.Major() < 2 {
return nil, fmt.Errorf("required Docker Compose v2, found %s", ver.String())
}
logger.Debugf("Determined Docker Compose version: %v", ver)
v, ok = os.LookupEnv(DisableVerboseOutputComposeEnv)
if ok && strings.ToLower(v) != "false" {
if c.composeVersion.LessThan(semver.MustParse("2.19.0")) {
c.disableANSI = true
} else {
// --ansi never looks is ignored by "docker compose" and latest versions of "docker-compose"
// adding --progress plain is a similar result as --ansi never
// if set to "--progress quiet", there is no output at all from docker compose commands
c.progressOutput = defaultComposeProgressOutput
}
c.disablePullProgressInformation = true
}
return &c, nil
}
// Up brings up a Docker Compose project.
func (p *Project) Up(ctx context.Context, opts CommandOptions) error {
args := p.baseArgs()
args = append(args, "up")
if p.disablePullProgressInformation {
args = append(args, "--quiet-pull")
}
args = append(args, opts.ExtraArgs...)
args = append(args, opts.Services...)
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env}); err != nil {
return fmt.Errorf("running Docker Compose up command failed: %w", err)
}
return nil
}
// Stop stops a Docker Compose project.
func (p *Project) Stop(ctx context.Context, opts CommandOptions) error {
args := p.baseArgs()
args = append(args, "stop")
args = append(args, opts.ExtraArgs...)
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env}); err != nil {
return fmt.Errorf("running Docker Compose stop command failed: %w", err)
}
return nil
}
// Down tears down a Docker Compose project.
func (p *Project) Down(ctx context.Context, opts CommandOptions) error {
args := p.baseArgs()
args = append(args, "down")
args = append(args, opts.ExtraArgs...)
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env}); err != nil {
return fmt.Errorf("running Docker Compose down command failed: %w", err)
}
return nil
}
// Build builds a Docker Compose project.
func (p *Project) Build(ctx context.Context, opts CommandOptions) error {
args := p.baseArgs()
args = append(args, "build")
args = append(args, opts.ExtraArgs...)
args = append(args, opts.Services...)
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env}); err != nil {
return fmt.Errorf("running Docker Compose build command failed: %w", err)
}
return nil
}
// Kill sends a signal to a service container.
func (p *Project) Kill(ctx context.Context, opts CommandOptions) error {
args := p.baseArgs()
args = append(args, "kill")
args = append(args, opts.ExtraArgs...)
args = append(args, opts.Services...)
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env}); err != nil {
return fmt.Errorf("running Docker Compose kill command failed: %w", err)
}
return nil
}
// Config returns the combined configuration for a Docker Compose project.
func (p *Project) Config(ctx context.Context, opts CommandOptions) (*Config, error) {
args := p.baseArgs()
args = append(args, "config")
args = append(args, opts.ExtraArgs...)
args = append(args, opts.Services...)
var b bytes.Buffer
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env, stdout: &b}); err != nil {
return nil, err
}
var config Config
if err := yaml.Unmarshal(b.Bytes(), &config); err != nil {
return nil, err
}
return &config, nil
}
// Pull pulls down images for a Docker Compose project.
func (p *Project) Pull(ctx context.Context, opts CommandOptions) error {
args := p.baseArgs()
args = append(args, "pull")
if p.disablePullProgressInformation {
args = append(args, "--quiet")
}
args = append(args, opts.ExtraArgs...)
args = append(args, opts.Services...)
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env}); err != nil {
return fmt.Errorf("running Docker Compose pull command failed: %w", err)
}
return nil
}
// Logs returns service logs for the selected service in the Docker Compose project.
func (p *Project) Logs(ctx context.Context, opts CommandOptions) ([]byte, error) {
args := p.baseArgs()
args = append(args, "logs")
args = append(args, opts.ExtraArgs...)
args = append(args, opts.Services...)
var b bytes.Buffer
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env, stdout: &b}); err != nil {
return nil, err
}
return b.Bytes(), nil
}
// WaitForHealthy method waits until all containers are healthy.
func (p *Project) WaitForHealthy(ctx context.Context, opts CommandOptions) error {
// Read container IDs
args := p.baseArgs()
args = append(args, "ps", "-a", "-q")
var b bytes.Buffer
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env, stdout: &b}); err != nil {
return err
}
ctx, stop := context.WithTimeout(ctx, waitForHealthyTimeout)
defer stop()
containerIDs := strings.Fields(b.String())
for {
// NOTE: healthy must be reinitialized at each iteration
healthy := true
logger.Debugf("Wait for healthy containers: %s", strings.Join(containerIDs, ","))
descriptions, err := docker.InspectContainers(containerIDs...)
if err != nil {
return err
}
for _, d := range descriptions {
switch {
// No healthcheck defined for service
case d.State.Status == "running" && d.State.Health == nil:
logger.Debugf("Container %s (%s) status: %s (no health status)", d.Config.Labels.ComposeService, d.ID, d.State.Status)
// Service is up and running and it's healthy
case d.State.Status == "running" && d.State.Health.Status == "healthy":
logger.Debugf("Container %s (%s) status: %s (health: %s)", d.Config.Labels.ComposeService, d.ID, d.State.Status, d.State.Health.Status)
// Container started and finished with exit code 0
case d.State.Status == "exited" && d.State.ExitCode == 0:
logger.Debugf("Container %s (%s) status: %s (exit code: %d)", d.Config.Labels.ComposeService, d.ID, d.State.Status, d.State.ExitCode)
// Container exited with code > 0
case d.State.Status == "exited" && d.State.ExitCode > 0:
logger.Debugf("Container %s (%s) status: %s (exit code: %d)", d.Config.Labels.ComposeService, d.ID, d.State.Status, d.State.ExitCode)
return fmt.Errorf("container (ID: %s) exited with code %d", d.ID, d.State.ExitCode)
// Any different status is considered unhealthy
default:
logger.Debugf("Container %s (%s) status: unhealthy", d.Config.Labels.ComposeService, d.ID)
healthy = false
}
}
// end loop before timeout if healthy
if healthy {
break
}
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return errors.New("timeout waiting for healthy container")
}
return ctx.Err()
// NOTE: using after does not guarantee interval but it's ok for this use case
case <-time.After(waitForHealthyInterval):
}
}
return nil
}
// ServiceExitCode returns true if the specified service is exited with an error.
func (p *Project) ServiceExitCode(ctx context.Context, service string, opts CommandOptions) (bool, int, error) {
// Read container IDs
args := p.baseArgs()
args = append(args, "ps", "-a", "-q", service)
var b bytes.Buffer
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, env: opts.Env, stdout: &b}); err != nil {
return false, -1, err
}
containerIDs := strings.Fields(b.String())
if len(containerIDs) != 1 {
return false, -1, fmt.Errorf("expected to find one service container named: %s, found: %d", service, len(containerIDs))
}
containerID := containerIDs[0]
containerDescriptions, err := docker.InspectContainers(containerID)
if err != nil {
return false, -1, err
}
if len(containerDescriptions) != 1 {
return false, -1, fmt.Errorf("expected to get one service status, found: %d", len(containerIDs))
}
containerDescription := containerDescriptions[0]
// Container exited with code > 0
if containerDescription.State.Status == "exited" {
return true, containerDescription.State.ExitCode, nil
}
return false, -1, nil
}
func (p *Project) baseArgs() []string {
var args []string
for _, path := range p.composeFilePaths {
args = append(args, "-f", path)
}
if p.disableANSI {
args = append(args, "--ansi", "never")
}
if p.progressOutput != "" {
args = append(args, "--progress", p.progressOutput)
}
args = append(args, "-p", p.name)
return args
}
type dockerComposeOptions struct {
args []string
env []string
stdout io.Writer
}
const daemonResponse = `Error response from daemon:`
// This regexp must match prefixes like WARN[0000], which may include escape sequences for colored letters
// or structured logs, starting with key=value pairs.
var composeLoggerPrefix = regexp.MustCompile(`^[^\s]+\[[0-9]+\]`)
func cleanComposeError(msg string) string {
// If there is a daemon response, just return it.
if i := strings.Index(msg, daemonResponse); i >= 0 {
return strings.TrimSpace(msg[i+len(daemonResponse):])
}
// Filter out lines coming from the docker compose structured logger.
var cleanError strings.Builder
scanner := bufio.NewScanner(strings.NewReader(msg))
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if composeLoggerPrefix.MatchString(line) {
continue
}
fmt.Fprintln(&cleanError, line)
}
return strings.TrimSpace(cleanError.String())
}
func (p *Project) dockerComposeBaseCommand() (name string, args []string) {
if p.dockerComposeStandalone {
return "docker-compose", nil
}
return "docker", []string{"compose"}
}
func (p *Project) dockerComposeStandaloneRequired() bool {
output, err := exec.Command("docker", "compose", "version", "--short").CombinedOutput()
if err == nil {
return false
} else {
logger.Debugf("docker compose subcommand failed: %v: %s", err, output)
}
return true
}
func (p *Project) dockerComposeVersion(ctx context.Context) (*semver.Version, error) {
var b bytes.Buffer
args := []string{
"version",
"--short",
}
if err := p.runDockerComposeCmd(ctx, dockerComposeOptions{args: args, stdout: &b}); err != nil {
return nil, fmt.Errorf("running Docker Compose version command failed: %w", err)
}
dcVersion := b.String()
ver, err := semver.NewVersion(strings.TrimSpace(dcVersion))
if err != nil {
return nil, fmt.Errorf("docker compose version is not a valid semver (value: %s): %w", dcVersion, err)
}
p.composeVersion = ver
return ver, nil
}
// ContainerName method the container name for the service.
func (p *Project) ContainerName(serviceName string) string {
return fmt.Sprintf("%s-%s-1", p.name, serviceName)
}