command-runner/internal/containers/docker/docker_run.go (493 lines of code) (raw):
package docker
import (
"archive/tar"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"github.com/aws/codecatalyst-runner-cli/command-runner/internal/containers/shared"
ctypes "github.com/aws/codecatalyst-runner-cli/command-runner/internal/containers/types"
"github.com/aws/codecatalyst-runner-cli/command-runner/pkg/common"
"github.com/rs/zerolog/log"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client"
specs "github.com/opencontainers/image-spec/specs-go/v1"
)
func (cr *dockerContainer) Create(capAdd []string, capDrop []string) common.Executor {
return common.
NewDebugExecutor("%sdocker create image=%s platform=%s entrypoint=%+q cmd=%+q", logPrefix, cr.input.Image, cr.input.Platform, cr.input.Entrypoint, cr.input.Cmd).
Then(
common.NewPipelineExecutor(
cr.connect(),
cr.find(),
cr.create(capAdd, capDrop),
).IfNot(common.Dryrun),
)
}
func (cr *dockerContainer) Start(attach bool) common.Executor {
return common.
NewInfoExecutor("%sdocker run image=%s", logPrefix, cr.input.Image).
Then(
common.NewPipelineExecutor(
cr.connect(),
cr.find(),
cr.attach().IfBool(attach),
cr.start(),
cr.wait().IfBool(attach),
cr.tryReadUID(),
cr.tryReadGID(),
).IfNot(common.Dryrun),
)
}
func (cr *dockerContainer) Pull(forcePull bool) common.Executor {
return common.
NewInfoExecutor("%sdocker pull image=%s", logPrefix, cr.input.Image).
Then(
newDockerPullExecutor(newDockerPullExecutorInput{
Image: cr.input.Image,
ForcePull: forcePull,
Platform: cr.input.Platform,
Username: cr.input.Username,
Password: cr.input.Password,
}),
)
}
func (cr *dockerContainer) CopyIn(containerPath string, hostPath string, useGitIgnore bool) common.Executor {
return common.NewPipelineExecutor(
common.NewDebugExecutor("%sdocker cp hostPath=%s containerPath=%s", logPrefix, hostPath, containerPath),
cr.copyIn(containerPath, hostPath, useGitIgnore),
).IfNot(common.Dryrun)
}
func (cr *dockerContainer) CopyOut(hostPath string, containerPath string) common.Executor {
return common.NewPipelineExecutor(
common.NewDebugExecutor("%sdocker cp hostPath=%s containerPath=%s", logPrefix, hostPath, containerPath),
cr.copyOut(hostPath, containerPath),
).IfNot(common.Dryrun)
}
func (cr *dockerContainer) Exec(command []string, env map[string]string, user, workdir string) common.Executor {
return common.NewPipelineExecutor(
common.NewDebugExecutor("%sdocker exec cmd=[%s] user=%s workdir=%s", logPrefix, strings.Join(command, " "), user, workdir),
cr.connect(),
cr.find(),
cr.exec(command, env, user, workdir),
).IfNot(common.Dryrun)
}
func (cr *dockerContainer) Remove() common.Executor {
return common.NewPipelineExecutor(
cr.connect(),
cr.find(),
).Finally(
cr.remove(),
).IfNot(common.Dryrun)
}
type dockerContainer struct {
cli client.APIClient
id string
input ctypes.NewContainerInput
UID int
GID int
}
func (cr *dockerContainer) Close() common.Executor {
return func(ctx context.Context) error {
if cr.cli != nil {
err := cr.cli.Close()
cr.cli = nil
if err != nil {
return fmt.Errorf("failed to close client: %w", err)
}
}
return nil
}
}
func (cr *dockerContainer) find() common.Executor {
return func(ctx context.Context) error {
if cr.id != "" {
return nil
}
containers, err := cr.cli.ContainerList(ctx, container.ListOptions{
All: true,
})
if err != nil {
return fmt.Errorf("failed to list containers: %w", err)
}
for _, c := range containers {
for _, name := range c.Names {
if name[1:] == cr.input.Name {
cr.id = c.ID
return nil
}
}
}
cr.id = ""
return nil
}
}
func (cr *dockerContainer) remove() common.Executor {
return func(ctx context.Context) error {
if cr.id == "" {
return nil
}
err := cr.cli.ContainerRemove(ctx, cr.id, container.RemoveOptions{
RemoveVolumes: true,
Force: true,
})
if err != nil {
log.Ctx(ctx).Err(fmt.Errorf("failed to remove container: %w", err))
}
log.Ctx(ctx).Printf("Removed container: %v", cr.id)
cr.id = ""
return nil
}
}
func (cr *dockerContainer) create(capAdd []string, capDrop []string) common.Executor {
return func(ctx context.Context) error {
if cr.id != "" {
return nil
}
input := cr.input
config := &container.Config{
Image: input.Image,
WorkingDir: input.WorkingDir,
Env: input.Env,
Tty: true,
}
log.Ctx(ctx).Printf("Common container.Config ==> %+v", config)
if len(input.Cmd) != 0 {
config.Cmd = input.Cmd
}
if len(input.Entrypoint) != 0 {
config.Entrypoint = input.Entrypoint
}
mounts := make([]mount.Mount, 0)
for mountSource, mountTarget := range input.Mounts {
mounts = append(mounts, mount.Mount{
Type: mount.TypeVolume,
Source: mountSource,
Target: mountTarget,
Consistency: mount.ConsistencyDefault,
})
}
var platSpecs *specs.Platform
if cr.input.Platform != "" {
desiredPlatform := strings.SplitN(cr.input.Platform, `/`, 2)
if len(desiredPlatform) != 2 {
return fmt.Errorf("incorrect container platform option '%s'", cr.input.Platform)
}
platSpecs = &specs.Platform{
Architecture: desiredPlatform[1],
OS: desiredPlatform[0],
}
}
hostConfig := &container.HostConfig{
CapAdd: capAdd,
CapDrop: capDrop,
Binds: input.Binds,
Mounts: mounts,
NetworkMode: container.NetworkMode(input.NetworkMode),
Privileged: input.Privileged,
UsernsMode: container.UsernsMode(input.UsernsMode),
}
log.Ctx(ctx).Printf("Common container.HostConfig ==> %+v", hostConfig)
resp, err := cr.cli.ContainerCreate(ctx, config, hostConfig, nil, platSpecs, input.Name)
if err != nil {
return fmt.Errorf("failed to create container: '%w'", err)
}
log.Ctx(ctx).Printf("Created container name=%s id=%v from image %v (platform: %s)", input.Name, resp.ID, input.Image, input.Platform)
log.Ctx(ctx).Printf("ENV ==> %v", input.Env)
cr.id = resp.ID
return nil
}
}
func (cr *dockerContainer) exec(cmd []string, env map[string]string, user, workdir string) common.Executor {
return func(ctx context.Context) error {
// Fix slashes when running on Windows
if runtime.GOOS == "windows" {
var newCmd []string
for _, v := range cmd {
newCmd = append(newCmd, strings.ReplaceAll(v, `\`, `/`))
}
cmd = newCmd
}
log.Ctx(ctx).Printf("Exec command '%s'", cmd)
envList := make([]string, 0)
for k, v := range env {
envList = append(envList, fmt.Sprintf("%s=%s", k, v))
}
var wd string
if workdir != "" {
if strings.HasPrefix(workdir, "/") {
wd = workdir
} else {
wd = fmt.Sprintf("%s/%s", cr.input.WorkingDir, workdir)
}
} else {
wd = cr.input.WorkingDir
}
log.Ctx(ctx).Printf("Working directory '%s'", wd)
idResp, err := cr.cli.ContainerExecCreate(ctx, cr.id, container.ExecOptions{
User: user,
Cmd: cmd,
WorkingDir: wd,
Env: envList,
Tty: true,
AttachStderr: true,
AttachStdout: true,
})
if err != nil {
return fmt.Errorf("failed to create exec: %w", err)
}
resp, err := cr.cli.ContainerExecAttach(ctx, idResp.ID, container.ExecStartOptions{
Tty: true,
})
if err != nil {
return fmt.Errorf("failed to attach to exec: %w", err)
}
defer resp.Close()
err = cr.waitForCommand(ctx, resp, idResp, user, workdir)
if err != nil {
return err
}
inspectResp, err := cr.cli.ContainerExecInspect(ctx, idResp.ID)
if err != nil {
return fmt.Errorf("failed to inspect exec: %w", err)
}
log.Ctx(ctx).Debug().Msgf("Got back exec inspect=%#v", inspectResp)
switch inspectResp.ExitCode {
case 0:
return nil
case 127:
return fmt.Errorf("exitcode '%d': command not found", inspectResp.ExitCode)
default:
return fmt.Errorf("exitcode '%d': failure", inspectResp.ExitCode)
}
}
}
func (cr *dockerContainer) tryReadID(opt string, cbk func(id int)) common.Executor {
return func(ctx context.Context) error {
idResp, err := cr.cli.ContainerExecCreate(ctx, cr.id, container.ExecOptions{
Cmd: []string{"id", opt},
AttachStdout: true,
AttachStderr: true,
})
if err != nil {
log.Ctx(ctx).Debug().Err(err).Msgf("tryReadID - Unable to create exec for container=%s", cr.id)
inspectResp, err := cr.cli.ContainerInspect(ctx, cr.id)
if err != nil {
log.Ctx(ctx).Debug().Err(err).Msgf("tryReadID - Unable to inspect container=%s", cr.id)
} else {
log.Ctx(ctx).Debug().Msgf("state=%#v", inspectResp.State)
logResp, err := cr.cli.ContainerLogs(ctx, cr.id, container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
})
if err != nil {
log.Ctx(ctx).Debug().Err(err).Msgf("tryReadID - Unable to get logs for container=%s", cr.id)
}
if log.Ctx(ctx).Debug().Enabled() {
_, _ = io.Copy(os.Stdout, logResp)
}
}
return nil
}
resp, err := cr.cli.ContainerExecAttach(ctx, idResp.ID, container.ExecStartOptions{})
if err != nil {
log.Ctx(ctx).Warn().Err(err).Msgf("tryReadID - Unable to attach exec for container=%s", cr.id)
return nil
}
defer resp.Close()
sid, err := resp.Reader.ReadString('\n')
if err != nil {
return nil
}
log.Ctx(ctx).Debug().Msgf("Reading id with opt=%s and got back: %s", opt, sid)
exp := regexp.MustCompile(`\d+\n`)
found := exp.FindString(sid)
if len(found) == 0 {
log.Ctx(ctx).Warn().Msgf("Unable to read id with opt=%s - got back: %s", opt, sid)
return nil
}
id, err := strconv.ParseInt(found[:len(found)-1], 10, 32)
if err != nil {
return nil
}
cbk(int(id))
return nil
}
}
func (cr *dockerContainer) tryReadUID() common.Executor {
return cr.tryReadID("-u", func(id int) { cr.UID = id })
}
func (cr *dockerContainer) tryReadGID() common.Executor {
return cr.tryReadID("-g", func(id int) { cr.GID = id })
}
func (cr *dockerContainer) waitForCommand(ctx context.Context, resp types.HijackedResponse, _ types.IDResponse, _ string, _ string) error {
cmdResponse := make(chan error)
go func() {
var outWriter io.Writer
outWriter = cr.input.Stdout
if outWriter == nil {
outWriter = os.Stdout
}
_, err := io.Copy(outWriter, resp.Reader)
cmdResponse <- err
}()
select {
case <-ctx.Done():
// send ctrl + c
_, err := resp.Conn.Write([]byte{3})
if err != nil {
log.Ctx(ctx).Warn().Msgf("Failed to send CTRL+C: %+s", err)
}
// we return the context canceled error to prevent other steps
// from executing
return ctx.Err()
case err := <-cmdResponse:
if err != nil {
log.Ctx(ctx).Err(err)
}
return nil
}
}
func (cr *dockerContainer) copyIn(containerPath string, hostPath string, useGitIgnore bool) common.Executor {
return func(ctx context.Context) error {
log.Ctx(ctx).Debug().Msgf("Writing %s from %s", containerPath, hostPath)
var tarFile *os.File
if fs, err := os.Stat(hostPath); err != nil {
return fmt.Errorf("unable to copyDir from srcPath=%s: %w", hostPath, err)
} else if filepath.Ext(hostPath) == ".tar" {
tarFile, err = os.Open(hostPath)
if err != nil {
return err
}
defer tarFile.Close()
} else if fs.IsDir() {
tarFile, err = shared.TarDirectory(ctx, hostPath, containerPath[1:], useGitIgnore, cr.UID, cr.GID)
if tarFile != nil {
defer func(tarFile *os.File) {
if err := tarFile.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
log.Ctx(ctx).Err(err)
}
if err := os.Remove(tarFile.Name()); err != nil {
log.Ctx(ctx).Err(err)
}
}(tarFile)
}
if err != nil {
return err
}
} else {
return fmt.Errorf("unsupported hostPath=%s", hostPath)
}
log.Ctx(ctx).Printf("Extracting content from '%s' to '%s'", tarFile.Name(), containerPath)
if err := cr.cli.CopyToContainer(ctx, cr.id, "/", tarFile, container.CopyToContainerOptions{}); err != nil {
return fmt.Errorf("failed to copy content to container: %w", err)
}
return nil
}
}
func (cr *dockerContainer) copyOut(hostPath string, containerPath string) common.Executor {
return func(ctx context.Context) error {
log.Ctx(ctx).Printf("Extracting content from '%s' to '%s'", containerPath, hostPath)
if out, _, err := cr.cli.CopyFromContainer(ctx, cr.id, containerPath); err != nil {
return fmt.Errorf("failed to copy content from container: %w", err)
} else {
tr := tar.NewReader(out)
defer out.Close()
for {
hdr, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error processing archive: %w", err)
}
hostPath := filepath.Join(hostPath, hdr.Name) // #nosec G305 - mitigated by following check
if !strings.HasPrefix(hostPath, filepath.Clean(hostPath)) {
return fmt.Errorf("content path is tainted: %s", hostPath)
}
switch hdr.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(hostPath, 0755); err != nil {
return err
}
case tar.TypeReg:
if err := os.MkdirAll(filepath.Dir(hostPath), 0755); err != nil {
return err
}
if f, err := os.Create(hostPath); err != nil {
return err
} else {
for {
_, err := io.CopyN(f, tr, 1024)
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("unable to copy tr file to output: %w", err)
}
}
}
}
}
}
return nil
}
}
func (cr *dockerContainer) attach() common.Executor {
return func(ctx context.Context) error {
out, err := cr.cli.ContainerAttach(ctx, cr.id, container.AttachOptions{
Stream: true,
Stdout: true,
Stderr: true,
})
if err != nil {
return fmt.Errorf("failed to attach to container: %w", err)
}
var outWriter io.Writer
outWriter = cr.input.Stdout
if outWriter == nil {
outWriter = os.Stdout
}
go func() {
_, err = io.Copy(outWriter, out.Reader)
if err != nil {
log.Ctx(ctx).Err(err)
}
}()
return nil
}
}
func (cr *dockerContainer) start() common.Executor {
return func(ctx context.Context) error {
log.Ctx(ctx).Printf("Starting container: %v", cr.id)
if err := cr.cli.ContainerStart(ctx, cr.id, container.StartOptions{}); err != nil {
return fmt.Errorf("failed to start container: %w", err)
}
log.Ctx(ctx).Printf("Started container: %v", cr.id)
return nil
}
}
func (cr *dockerContainer) wait() common.Executor {
return func(ctx context.Context) error {
statusCh, errCh := cr.cli.ContainerWait(ctx, cr.id, container.WaitConditionNotRunning)
var statusCode int64
select {
case err := <-errCh:
if err != nil {
return fmt.Errorf("failed to wait for container: %w", err)
}
case status := <-statusCh:
statusCode = status.StatusCode
}
log.Ctx(ctx).Printf("Return status: %v", statusCode)
if statusCode == 0 {
return nil
}
return fmt.Errorf("exit with `FAILURE`: %v", statusCode)
}
}