executors/docker/executor_docker.go (997 lines of code) (raw):
package docker
import (
"bytes"
"crypto/md5"
"errors"
"fmt"
"io"
"path"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/stdcopy"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/executors"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers"
docker_helpers "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/docker"
"golang.org/x/net/context"
)
const (
DockerExecutorStagePrepare common.ExecutorStage = "docker_prepare"
DockerExecutorStageRun common.ExecutorStage = "docker_run"
DockerExecutorStageCleanup common.ExecutorStage = "docker_cleanup"
DockerExecutorStageCreatingBuildVolumes common.ExecutorStage = "docker_creating_build_volumes"
DockerExecutorStageCreatingServices common.ExecutorStage = "docker_creating_services"
DockerExecutorStageCreatingUserVolumes common.ExecutorStage = "docker_creating_user_volumes"
DockerExecutorStagePullingImage common.ExecutorStage = "docker_pulling_image"
)
var neverRestartPolicy = container.RestartPolicy{Name: "no"}
type executor struct {
executors.AbstractExecutor
client docker_helpers.Client
failures []string // IDs of containers that have failed in some way
builds []string // IDs of successfully created build containers
services []*types.Container
caches []string // IDs of cache containers
info types.Info
binds []string
volumesFrom []string
devices []container.DeviceMapping
links []string
}
func (s *executor) getServiceVariables() []string {
return s.Build.GetAllVariables().PublicOrInternal().StringList()
}
func (s *executor) getUserAuthConfiguration(indexName string) *types.AuthConfig {
if s.Build == nil {
return nil
}
buf := bytes.NewBufferString(s.Build.GetDockerAuthConfig())
authConfigs, _ := docker_helpers.ReadAuthConfigsFromReader(buf)
if authConfigs != nil {
return docker_helpers.ResolveDockerAuthConfig(indexName, authConfigs)
}
return nil
}
func (s *executor) getBuildAuthConfiguration(indexName string) *types.AuthConfig {
if s.Build == nil {
return nil
}
authConfigs := make(map[string]types.AuthConfig)
for _, credentials := range s.Build.Credentials {
if credentials.Type != "registry" {
continue
}
authConfigs[credentials.URL] = types.AuthConfig{
Username: credentials.Username,
Password: credentials.Password,
ServerAddress: credentials.URL,
}
}
if authConfigs != nil {
return docker_helpers.ResolveDockerAuthConfig(indexName, authConfigs)
}
return nil
}
func (s *executor) getHomeDirAuthConfiguration(indexName string) *types.AuthConfig {
authConfigs, _ := docker_helpers.ReadDockerAuthConfigsFromHomeDir(s.Shell().User)
if authConfigs != nil {
return docker_helpers.ResolveDockerAuthConfig(indexName, authConfigs)
}
return nil
}
func (s *executor) getAuthConfig(imageName string) *types.AuthConfig {
indexName, _ := docker_helpers.SplitDockerImageName(imageName)
authConfig := s.getUserAuthConfiguration(indexName)
if authConfig == nil {
authConfig = s.getHomeDirAuthConfiguration(indexName)
}
if authConfig == nil {
authConfig = s.getBuildAuthConfiguration(indexName)
}
if authConfig != nil {
s.Debugln("Using", authConfig.Username, "to connect to", authConfig.ServerAddress,
"in order to resolve", imageName, "...")
return authConfig
}
s.Debugln(fmt.Sprintf("No credentials found for %v", indexName))
return nil
}
func (s *executor) pullDockerImage(imageName string, ac *types.AuthConfig) (*types.ImageInspect, error) {
s.SetCurrentStage(DockerExecutorStagePullingImage)
s.Println("Pulling docker image", imageName, "...")
ref := imageName
// Add :latest to limit the download results
if !strings.ContainsAny(ref, ":@") {
ref += ":latest"
}
options := types.ImagePullOptions{}
if ac != nil {
options.RegistryAuth, _ = docker_helpers.EncodeAuthConfig(ac)
}
errorRegexp := regexp.MustCompile("(repository does not exist|not found)")
if err := s.client.ImagePullBlocking(s.Context, ref, options); err != nil {
if errorRegexp.MatchString(err.Error()) {
return nil, &common.BuildError{Inner: err}
}
return nil, err
}
image, _, err := s.client.ImageInspectWithRaw(s.Context, imageName)
return &image, err
}
func (s *executor) getDockerImage(imageName string) (*types.ImageInspect, error) {
pullPolicy, err := s.Config.Docker.PullPolicy.Get()
if err != nil {
return nil, err
}
authConfig := s.getAuthConfig(imageName)
s.Debugln("Looking for image", imageName, "...")
image, _, err := s.client.ImageInspectWithRaw(s.Context, imageName)
// If never is specified then we return what inspect did return
if pullPolicy == common.PullPolicyNever {
return &image, err
}
if err == nil {
// Don't pull image that is passed by ID
if image.ID == imageName {
return &image, nil
}
// If not-present is specified
if pullPolicy == common.PullPolicyIfNotPresent {
s.Println("Using locally found image version due to if-not-present pull policy")
return &image, err
}
}
newImage, err := s.pullDockerImage(imageName, authConfig)
if err != nil {
return nil, err
}
return newImage, nil
}
func (s *executor) getArchitecture() string {
architecture := s.info.Architecture
switch architecture {
case "armv6l", "armv7l", "aarch64":
architecture = "arm"
case "amd64":
architecture = "x86_64"
}
if architecture != "" {
return architecture
}
switch runtime.GOARCH {
case "amd64":
return "x86_64"
default:
return runtime.GOARCH
}
}
func (s *executor) getPrebuiltImage() (*types.ImageInspect, error) {
architecture := s.getArchitecture()
if architecture == "" {
return nil, errors.New("unsupported docker architecture")
}
imageName := prebuiltImageName + ":" + architecture + "-" + common.REVISION
s.Debugln("Looking for prebuilt image", imageName, "...")
image, _, err := s.client.ImageInspectWithRaw(s.Context, imageName)
if err == nil {
return &image, nil
}
data, err := Asset("prebuilt-" + architecture + prebuiltImageExtension)
if err != nil {
return nil, fmt.Errorf("Unsupported architecture: %s: %q", architecture, err.Error())
}
s.Debugln("Loading prebuilt image...")
ref := prebuiltImageName
source := types.ImageImportSource{
Source: bytes.NewBuffer(data),
SourceName: "-",
}
options := types.ImageImportOptions{
Tag: architecture + "-" + common.REVISION,
}
if err := s.client.ImageImportBlocking(s.Context, source, ref, options); err != nil {
return nil, fmt.Errorf("Failed to import image: %s", err)
}
image, _, err = s.client.ImageInspectWithRaw(s.Context, imageName)
if err != nil {
s.Debugln("Inspecting imported image", imageName, "failed:", err)
return nil, err
}
return &image, err
}
func (s *executor) getAbsoluteContainerPath(dir string) string {
if path.IsAbs(dir) {
return dir
}
return path.Join(s.Build.FullProjectDir(), dir)
}
func (s *executor) addHostVolume(hostPath, containerPath string) error {
containerPath = s.getAbsoluteContainerPath(containerPath)
s.Debugln("Using host-based", hostPath, "for", containerPath, "...")
s.binds = append(s.binds, fmt.Sprintf("%v:%v", hostPath, containerPath))
return nil
}
func (s *executor) getLabels(containerType string, otherLabels ...string) map[string]string {
labels := make(map[string]string)
labels[dockerLabelPrefix+".job.id"] = strconv.Itoa(s.Build.ID)
labels[dockerLabelPrefix+".job.sha"] = s.Build.GitInfo.Sha
labels[dockerLabelPrefix+".job.before_sha"] = s.Build.GitInfo.BeforeSha
labels[dockerLabelPrefix+".job.ref"] = s.Build.GitInfo.Ref
labels[dockerLabelPrefix+".project.id"] = strconv.Itoa(s.Build.JobInfo.ProjectID)
labels[dockerLabelPrefix+".runner.id"] = s.Build.Runner.ShortDescription()
labels[dockerLabelPrefix+".runner.local_id"] = strconv.Itoa(s.Build.RunnerID)
labels[dockerLabelPrefix+".type"] = containerType
for _, label := range otherLabels {
keyValue := strings.SplitN(label, "=", 2)
if len(keyValue) == 2 {
labels[dockerLabelPrefix+"."+keyValue[0]] = keyValue[1]
}
}
return labels
}
// createCacheVolume returns the id of the created container, or an error
func (s *executor) createCacheVolume(containerName, containerPath string) (string, error) {
// get busybox image
cacheImage, err := s.getPrebuiltImage()
if err != nil {
return "", err
}
config := &container.Config{
Image: cacheImage.ID,
Cmd: []string{
"gitlab-runner-cache", containerPath,
},
Volumes: map[string]struct{}{
containerPath: {},
},
Labels: s.getLabels("cache", "cache.dir="+containerPath),
}
hostConfig := &container.HostConfig{
LogConfig: container.LogConfig{
Type: "json-file",
},
}
resp, err := s.client.ContainerCreate(s.Context, config, hostConfig, nil, containerName)
if err != nil {
if resp.ID != "" {
s.failures = append(s.failures, resp.ID)
}
return "", err
}
s.Debugln("Starting cache container", resp.ID, "...")
err = s.client.ContainerStart(s.Context, resp.ID, types.ContainerStartOptions{})
if err != nil {
s.failures = append(s.failures, resp.ID)
return "", err
}
s.Debugln("Waiting for cache container", resp.ID, "...")
err = s.waitForContainer(resp.ID)
if err != nil {
s.failures = append(s.failures, resp.ID)
return "", err
}
return resp.ID, nil
}
func (s *executor) addCacheVolume(containerPath string) error {
var err error
containerPath = s.getAbsoluteContainerPath(containerPath)
// disable cache for automatic container cache, but leave it for host volumes (they are shared on purpose)
if s.Config.Docker.DisableCache {
s.Debugln("Container cache for", containerPath, " is disabled.")
return nil
}
hash := md5.Sum([]byte(containerPath))
// use host-based cache
if cacheDir := s.Config.Docker.CacheDir; cacheDir != "" {
hostPath := fmt.Sprintf("%s/%s/%x", cacheDir, s.Build.ProjectUniqueName(), hash)
hostPath, err := filepath.Abs(hostPath)
if err != nil {
return err
}
s.Debugln("Using path", hostPath, "as cache for", containerPath, "...")
s.binds = append(s.binds, fmt.Sprintf("%v:%v", filepath.ToSlash(hostPath), containerPath))
return nil
}
// get existing cache container
var containerID string
containerName := fmt.Sprintf("%s-cache-%x", s.Build.ProjectUniqueName(), hash)
if inspected, err := s.client.ContainerInspect(s.Context, containerName); err == nil {
// check if we have valid cache, if not remove the broken container
if _, ok := inspected.Config.Volumes[containerPath]; !ok {
s.Debugln("Removing broken cache container for ", containerPath, "path")
s.removeContainer(s.Context, inspected.ID)
} else {
containerID = inspected.ID
}
}
// create new cache container for that project
if containerID == "" {
containerID, err = s.createCacheVolume(containerName, containerPath)
if err != nil {
return err
}
}
s.Debugln("Using container", containerID, "as cache", containerPath, "...")
s.volumesFrom = append(s.volumesFrom, containerID)
return nil
}
func (s *executor) addVolume(volume string) error {
var err error
hostVolume := strings.SplitN(volume, ":", 2)
switch len(hostVolume) {
case 2:
err = s.addHostVolume(hostVolume[0], hostVolume[1])
case 1:
// disable cache disables
err = s.addCacheVolume(hostVolume[0])
}
if err != nil {
s.Errorln("Failed to create container volume for", volume, err)
}
return err
}
func fakeContainer(id string, names ...string) *types.Container {
return &types.Container{ID: id, Names: names}
}
func (s *executor) createBuildVolume() error {
// Cache Git sources:
// take path of the projects directory,
// because we use `rm -rf` which could remove the mounted volume
parentDir := path.Dir(s.Build.FullProjectDir())
if !path.IsAbs(parentDir) && parentDir != "/" {
return errors.New("build directory needs to be absolute and non-root path")
}
if s.isHostMountedVolume(s.Build.RootDir, s.Config.Docker.Volumes...) {
return nil
}
if s.Build.GetGitStrategy() == common.GitFetch && !s.Config.Docker.DisableCache {
// create persistent cache container
return s.addVolume(parentDir)
}
// create temporary cache container
id, err := s.createCacheVolume("", parentDir)
if err != nil {
return err
}
s.caches = append(s.caches, id)
s.volumesFrom = append(s.volumesFrom, id)
return nil
}
func (s *executor) createUserVolumes() (err error) {
for _, volume := range s.Config.Docker.Volumes {
err = s.addVolume(volume)
if err != nil {
return
}
}
return nil
}
func (s *executor) isHostMountedVolume(dir string, volumes ...string) bool {
isParentOf := func(parent string, dir string) bool {
for dir != "/" && dir != "." {
if dir == parent {
return true
}
dir = path.Dir(dir)
}
return false
}
for _, volume := range volumes {
hostVolume := strings.Split(volume, ":")
if len(hostVolume) < 2 {
continue
}
if isParentOf(path.Clean(hostVolume[1]), path.Clean(dir)) {
return true
}
}
return false
}
func (s *executor) parseDeviceString(deviceString string) (device container.DeviceMapping, err error) {
// Split the device string PathOnHost[:PathInContainer[:CgroupPermissions]]
parts := strings.Split(deviceString, ":")
if len(parts) > 3 {
err = fmt.Errorf("Too many colons")
return
}
device.PathOnHost = parts[0]
// Optional container path
if len(parts) >= 2 {
device.PathInContainer = parts[1]
} else {
// default: device at same path in container
device.PathInContainer = device.PathOnHost
}
// Optional permissions
if len(parts) >= 3 {
device.CgroupPermissions = parts[2]
} else {
// default: rwm, just like 'docker run'
device.CgroupPermissions = "rwm"
}
return
}
func (s *executor) bindDevices() (err error) {
for _, deviceString := range s.Config.Docker.Devices {
device, err := s.parseDeviceString(deviceString)
if err != nil {
err = fmt.Errorf("Failed to parse device string %q: %s", deviceString, err)
return err
}
s.devices = append(s.devices, device)
}
return nil
}
func (s *executor) printUsedDockerImageID(imageName, imageID, containerType, containerTypeName string) {
var line string
if imageName == imageID {
line = fmt.Sprintf("Using docker image %s for %s %s...", imageName, containerTypeName, containerType)
} else {
line = fmt.Sprintf("Using docker image %s ID=%s for %s %s...", imageName, imageID, containerTypeName, containerType)
}
s.Println(line)
}
func (s *executor) splitServiceAndVersion(serviceDescription string) (service, version, imageName string, linkNames []string) {
ReferenceRegexpNoPort := regexp.MustCompile(`^(.*?)(|:[0-9]+)(|/.*)$`)
imageName = serviceDescription
version = "latest"
if match := reference.ReferenceRegexp.FindStringSubmatch(serviceDescription); match != nil {
matchService := ReferenceRegexpNoPort.FindStringSubmatch(match[1])
service = matchService[1] + matchService[3]
if len(match[2]) > 0 {
version = match[2]
} else {
imageName = match[1] + ":" + version
}
} else {
return
}
linkName := strings.Replace(service, "/", "__", -1)
linkNames = append(linkNames, linkName)
// Create alternative link name according to RFC 1123
// Where you can use only `a-zA-Z0-9-`
if alternativeName := strings.Replace(service, "/", "-", -1); linkName != alternativeName {
linkNames = append(linkNames, alternativeName)
}
return
}
func (s *executor) createService(serviceIndex int, service, version, image string, serviceDefinition common.Image) (*types.Container, error) {
if len(service) == 0 {
return nil, errors.New("invalid service name")
}
s.Println("Starting service", service+":"+version, "...")
serviceImage, err := s.getDockerImage(image)
if err != nil {
return nil, err
}
s.printUsedDockerImageID(image, serviceImage.ID, "service", service)
serviceSlug := strings.Replace(service, "/", "__", -1)
containerName := fmt.Sprintf("%s-%s-%d", s.Build.ProjectUniqueName(), serviceSlug, serviceIndex)
// this will fail potentially some builds if there's name collision
s.removeContainer(s.Context, containerName)
config := &container.Config{
Image: serviceImage.ID,
Labels: s.getLabels("service", "service="+service, "service.version="+version),
Env: s.getServiceVariables(),
}
if len(serviceDefinition.Command) > 0 {
config.Cmd = serviceDefinition.Command
}
if len(serviceDefinition.Entrypoint) > 0 {
config.Entrypoint = serviceDefinition.Entrypoint
}
hostConfig := &container.HostConfig{
RestartPolicy: neverRestartPolicy,
Privileged: s.Config.Docker.Privileged,
NetworkMode: container.NetworkMode(s.Config.Docker.NetworkMode),
Binds: s.binds,
ShmSize: s.Config.Docker.ShmSize,
VolumesFrom: s.volumesFrom,
Tmpfs: s.Config.Docker.ServicesTmpfs,
LogConfig: container.LogConfig{
Type: "json-file",
},
}
s.Debugln("Creating service container", containerName, "...")
resp, err := s.client.ContainerCreate(s.Context, config, hostConfig, nil, containerName)
if err != nil {
return nil, err
}
s.Debugln("Starting service container", resp.ID, "...")
err = s.client.ContainerStart(s.Context, resp.ID, types.ContainerStartOptions{})
if err != nil {
s.failures = append(s.failures, resp.ID)
return nil, err
}
return fakeContainer(resp.ID, containerName), nil
}
func (s *executor) getServicesDefinitions() (common.Services, error) {
serviceDefinitions := common.Services{}
for _, service := range s.Config.Docker.Services {
serviceDefinitions = append(serviceDefinitions, common.Image{Name: service})
}
for _, service := range s.Build.Services {
serviceName := s.Build.GetAllVariables().ExpandValue(service.Name)
err := s.verifyAllowedImage(serviceName, "services", s.Config.Docker.AllowedServices, s.Config.Docker.Services)
if err != nil {
return nil, err
}
service.Name = serviceName
serviceDefinitions = append(serviceDefinitions, service)
}
return serviceDefinitions, nil
}
func (s *executor) waitForServices() {
waitForServicesTimeout := s.Config.Docker.WaitForServicesTimeout
if waitForServicesTimeout == 0 {
waitForServicesTimeout = common.DefaultWaitForServicesTimeout
}
// wait for all services to came up
if waitForServicesTimeout > 0 && len(s.services) > 0 {
s.Println("Waiting for services to be up and running...")
wg := sync.WaitGroup{}
for _, service := range s.services {
wg.Add(1)
go func(service *types.Container) {
s.waitForServiceContainer(service, time.Duration(waitForServicesTimeout)*time.Second)
wg.Done()
}(service)
}
wg.Wait()
}
}
func (s *executor) buildServiceLinks(linksMap map[string]*types.Container) (links []string) {
for linkName, linkee := range linksMap {
newContainer, err := s.client.ContainerInspect(s.Context, linkee.ID)
if err != nil {
continue
}
if newContainer.State.Running {
links = append(links, linkee.ID+":"+linkName)
}
}
return
}
func (s *executor) createFromServiceDefinition(serviceIndex int, serviceDefinition common.Image, linksMap map[string]*types.Container) (err error) {
var container *types.Container
service, version, imageName, linkNames := s.splitServiceAndVersion(serviceDefinition.Name)
if serviceDefinition.Alias != "" {
linkNames = append(linkNames, serviceDefinition.Alias)
}
for _, linkName := range linkNames {
if linksMap[linkName] != nil {
s.Warningln("Service", serviceDefinition.Name, "is already created. Ignoring.")
continue
}
// Create service if not yet created
if container == nil {
container, err = s.createService(serviceIndex, service, version, imageName, serviceDefinition)
if err != nil {
return
}
s.Debugln("Created service", serviceDefinition.Name, "as", container.ID)
s.services = append(s.services, container)
}
linksMap[linkName] = container
}
return
}
func (s *executor) createServices() (err error) {
servicesDefinitions, err := s.getServicesDefinitions()
if err != nil {
return
}
linksMap := make(map[string]*types.Container)
for index, serviceDefinition := range servicesDefinitions {
err = s.createFromServiceDefinition(index, serviceDefinition, linksMap)
if err != nil {
return
}
}
s.waitForServices()
s.links = s.buildServiceLinks(linksMap)
return
}
func (s *executor) createContainer(containerType string, imageDefinition common.Image, cmd []string, allowedInternalImages []string) (*types.ContainerJSON, error) {
imageName, err := s.expandImageName(imageDefinition.Name, allowedInternalImages)
if err != nil {
return nil, err
}
// Fetch image
image, err := s.getDockerImage(imageName)
if err != nil {
return nil, err
}
s.printUsedDockerImageID(imageName, image.ID, "container", containerType)
hostname := s.Config.Docker.Hostname
if hostname == "" {
hostname = s.Build.ProjectUniqueName()
}
containerName := s.Build.ProjectUniqueName() + "-" + containerType
config := &container.Config{
Image: image.ID,
Hostname: hostname,
Cmd: cmd,
Labels: s.getLabels(containerType),
Tty: false,
AttachStdin: true,
AttachStdout: true,
AttachStderr: true,
OpenStdin: true,
StdinOnce: true,
Env: append(s.Build.GetAllVariables().StringList(), s.BuildShell.Environment...),
}
if len(imageDefinition.Entrypoint) > 0 {
config.Entrypoint = imageDefinition.Entrypoint
}
nanoCPUs, err := s.Config.Docker.GetNanoCPUs()
if err != nil {
return nil, err
}
hostConfig := &container.HostConfig{
Resources: container.Resources{
CpusetCpus: s.Config.Docker.CPUSetCPUs,
NanoCPUs: nanoCPUs,
Devices: s.devices,
},
DNS: s.Config.Docker.DNS,
DNSSearch: s.Config.Docker.DNSSearch,
Privileged: s.Config.Docker.Privileged,
UsernsMode: container.UsernsMode(s.Config.Docker.UsernsMode),
CapAdd: s.Config.Docker.CapAdd,
CapDrop: s.Config.Docker.CapDrop,
SecurityOpt: s.Config.Docker.SecurityOpt,
RestartPolicy: neverRestartPolicy,
ExtraHosts: s.Config.Docker.ExtraHosts,
NetworkMode: container.NetworkMode(s.Config.Docker.NetworkMode),
Links: append(s.Config.Docker.Links, s.links...),
Binds: s.binds,
ShmSize: s.Config.Docker.ShmSize,
VolumeDriver: s.Config.Docker.VolumeDriver,
VolumesFrom: append(s.Config.Docker.VolumesFrom, s.volumesFrom...),
LogConfig: container.LogConfig{
Type: "json-file",
},
Tmpfs: s.Config.Docker.Tmpfs,
Sysctls: s.Config.Docker.SysCtls,
}
// this will fail potentially some builds if there's name collision
s.removeContainer(s.Context, containerName)
s.Debugln("Creating container", containerName, "...")
resp, err := s.client.ContainerCreate(s.Context, config, hostConfig, nil, containerName)
if err != nil {
if resp.ID != "" {
s.failures = append(s.failures, resp.ID)
}
return nil, err
}
inspect, err := s.client.ContainerInspect(s.Context, resp.ID)
if err != nil {
s.failures = append(s.failures, resp.ID)
return nil, err
}
s.builds = append(s.builds, resp.ID)
return &inspect, nil
}
func (s *executor) killContainer(id string, waitCh chan error) (err error) {
for {
s.disconnectNetwork(s.Context, id)
s.Debugln("Killing container", id, "...")
s.client.ContainerKill(s.Context, id, "SIGKILL")
// Wait for signal that container were killed
// or retry after some time
select {
case err = <-waitCh:
return
case <-time.After(time.Second):
}
}
}
func (s *executor) waitForContainer(id string) error {
s.Debugln("Waiting for container", id, "...")
retries := 0
// Use active wait
for {
container, err := s.client.ContainerInspect(s.Context, id)
if err != nil {
if docker_helpers.IsErrNotFound(err) {
return err
}
if retries > 3 {
return err
}
retries++
time.Sleep(time.Second)
continue
}
// Reset retry timer
retries = 0
if container.State.Running {
time.Sleep(time.Second)
continue
}
if container.State.ExitCode != 0 {
return &common.BuildError{
Inner: fmt.Errorf("exit code %d", container.State.ExitCode),
}
}
return nil
}
}
func (s *executor) watchContainer(ctx context.Context, id string, input io.Reader) (err error) {
options := types.ContainerAttachOptions{
Stream: true,
Stdin: true,
Stdout: true,
Stderr: true,
}
s.Debugln("Attaching to container", id, "...")
hijacked, err := s.client.ContainerAttach(ctx, id, options)
if err != nil {
return
}
defer hijacked.Close()
s.Debugln("Starting container", id, "...")
err = s.client.ContainerStart(ctx, id, types.ContainerStartOptions{})
if err != nil {
return
}
s.Debugln("Waiting for attach to finish", id, "...")
attachCh := make(chan error, 2)
// Copy any output to the build trace
go func() {
_, err := stdcopy.StdCopy(s.Trace, s.Trace, hijacked.Reader)
if err != nil {
attachCh <- err
}
}()
// Write the input to the container and close its STDIN to get it to finish
go func() {
_, err := io.Copy(hijacked.Conn, input)
hijacked.CloseWrite()
if err != nil {
attachCh <- err
}
}()
waitCh := make(chan error, 1)
go func() {
waitCh <- s.waitForContainer(id)
}()
select {
case <-ctx.Done():
s.killContainer(id, waitCh)
err = errors.New("Aborted")
case err = <-attachCh:
s.killContainer(id, waitCh)
s.Debugln("Container", id, "finished with", err)
case err = <-waitCh:
s.Debugln("Container", id, "finished with", err)
}
return
}
func (s *executor) removeContainer(ctx context.Context, id string) error {
s.disconnectNetwork(ctx, id)
options := types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
}
err := s.client.ContainerRemove(ctx, id, options)
s.Debugln("Removed container", id, "with", err)
return err
}
func (s *executor) disconnectNetwork(ctx context.Context, id string) error {
netList, err := s.client.NetworkList(ctx, types.NetworkListOptions{})
if err != nil {
s.Debugln("Can't get network list. ListNetworks exited with", err)
return err
}
for _, network := range netList {
for _, pluggedContainer := range network.Containers {
if id == pluggedContainer.Name {
err = s.client.NetworkDisconnect(ctx, network.ID, id, true)
if err != nil {
s.Warningln("Can't disconnect possibly zombie container", pluggedContainer.Name, "from network", network.Name, "->", err)
} else {
s.Warningln("Possibly zombie container", pluggedContainer.Name, "is disconnected from network", network.Name)
}
break
}
}
}
return err
}
func (s *executor) verifyAllowedImage(image, optionName string, allowedImages []string, internalImages []string) error {
for _, allowedImage := range allowedImages {
ok, _ := filepath.Match(allowedImage, image)
if ok {
return nil
}
}
for _, internalImage := range internalImages {
if internalImage == image {
return nil
}
}
if len(allowedImages) != 0 {
s.Println()
s.Errorln("The", image, "is not present on list of allowed", optionName)
for _, allowedImage := range allowedImages {
s.Println("-", allowedImage)
}
s.Println()
} else {
// by default allow to override the image name
return nil
}
s.Println("Please check runner's configuration: http://doc.gitlab.com/ci/docker/using_docker_images.html#overwrite-image-and-services")
return errors.New("invalid image")
}
func (s *executor) expandImageName(imageName string, allowedInternalImages []string) (string, error) {
if imageName != "" {
image := s.Build.GetAllVariables().ExpandValue(imageName)
allowedInternalImages = append(allowedInternalImages, s.Config.Docker.Image)
err := s.verifyAllowedImage(image, "images", s.Config.Docker.AllowedImages, allowedInternalImages)
if err != nil {
return "", err
}
return image, nil
}
if s.Config.Docker.Image == "" {
return "", errors.New("No Docker image specified to run the build in")
}
return s.Config.Docker.Image, nil
}
func (s *executor) connectDocker() (err error) {
client, err := docker_helpers.New(s.Config.Docker.DockerCredentials, DockerAPIVersion)
if err != nil {
return err
}
s.client = client
s.info, err = client.Info(s.Context)
if err != nil {
return err
}
return
}
func (s *executor) createDependencies() (err error) {
err = s.bindDevices()
if err != nil {
return err
}
s.SetCurrentStage(DockerExecutorStageCreatingBuildVolumes)
s.Debugln("Creating build volume...")
err = s.createBuildVolume()
if err != nil {
return err
}
s.SetCurrentStage(DockerExecutorStageCreatingServices)
s.Debugln("Creating services...")
err = s.createServices()
if err != nil {
return err
}
s.SetCurrentStage(DockerExecutorStageCreatingUserVolumes)
s.Debugln("Creating user-defined volumes...")
err = s.createUserVolumes()
if err != nil {
return err
}
return
}
func (s *executor) Prepare(options common.ExecutorPrepareOptions) error {
err := s.prepareBuildsDir(options.Config)
if err != nil {
return err
}
err = s.AbstractExecutor.Prepare(options)
if err != nil {
return err
}
if s.BuildShell.PassFile {
return errors.New("Docker doesn't support shells that require script file")
}
if options.Config.Docker == nil {
return errors.New("Missing docker configuration")
}
s.SetCurrentStage(DockerExecutorStagePrepare)
imageName, err := s.expandImageName(s.Build.Image.Name, []string{})
if err != nil {
return err
}
s.Println("Using Docker executor with image", imageName, "...")
err = s.connectDocker()
if err != nil {
return err
}
err = s.createDependencies()
if err != nil {
return err
}
return nil
}
func (s *executor) prepareBuildsDir(config *common.RunnerConfig) error {
rootDir := config.BuildsDir
if rootDir == "" {
rootDir = s.DefaultBuildsDir
}
if s.isHostMountedVolume(rootDir, config.Docker.Volumes...) {
s.SharedBuildsDir = true
}
return nil
}
func (s *executor) Cleanup() {
s.SetCurrentStage(DockerExecutorStageCleanup)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), dockerCleanupTimeout)
defer cancel()
remove := func(id string) {
wg.Add(1)
go func() {
s.removeContainer(ctx, id)
wg.Done()
}()
}
for _, failureID := range s.failures {
remove(failureID)
}
for _, service := range s.services {
remove(service.ID)
}
for _, cacheID := range s.caches {
remove(cacheID)
}
for _, buildID := range s.builds {
remove(buildID)
}
wg.Wait()
if s.client != nil {
s.client.Close()
}
s.AbstractExecutor.Cleanup()
}
func (s *executor) runServiceHealthCheckContainer(service *types.Container, timeout time.Duration) error {
waitImage, err := s.getPrebuiltImage()
if err != nil {
return err
}
containerName := service.Names[0] + "-wait-for-service"
config := &container.Config{
Cmd: []string{"gitlab-runner-service"},
Image: waitImage.ID,
Labels: s.getLabels("wait", "wait="+service.ID),
}
hostConfig := &container.HostConfig{
RestartPolicy: neverRestartPolicy,
Links: []string{service.Names[0] + ":" + service.Names[0]},
NetworkMode: container.NetworkMode(s.Config.Docker.NetworkMode),
LogConfig: container.LogConfig{
Type: "json-file",
},
}
s.Debugln("Waiting for service container", containerName, "to be up and running...")
resp, err := s.client.ContainerCreate(s.Context, config, hostConfig, nil, containerName)
if err != nil {
return err
}
defer s.removeContainer(s.Context, resp.ID)
err = s.client.ContainerStart(s.Context, resp.ID, types.ContainerStartOptions{})
if err != nil {
return err
}
waitResult := make(chan error, 1)
go func() {
waitResult <- s.waitForContainer(resp.ID)
}()
// these are warnings and they don't make the build fail
select {
case err := <-waitResult:
return err
case <-time.After(timeout):
return fmt.Errorf("service %v did timeout", containerName)
}
}
func (s *executor) waitForServiceContainer(service *types.Container, timeout time.Duration) error {
err := s.runServiceHealthCheckContainer(service, timeout)
if err == nil {
return nil
}
var buffer bytes.Buffer
buffer.WriteString("\n")
buffer.WriteString(helpers.ANSI_YELLOW + "*** WARNING:" + helpers.ANSI_RESET + " Service " + service.Names[0] + " probably didn't start properly.\n")
buffer.WriteString("\n")
buffer.WriteString(strings.TrimSpace(err.Error()) + "\n")
var containerBuffer bytes.Buffer
options := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Timestamps: true,
}
hijacked, err := s.client.ContainerLogs(s.Context, service.ID, options)
if err == nil {
defer hijacked.Close()
stdcopy.StdCopy(&containerBuffer, &containerBuffer, hijacked)
if containerLog := containerBuffer.String(); containerLog != "" {
buffer.WriteString("\n")
buffer.WriteString(strings.TrimSpace(containerLog))
buffer.WriteString("\n")
}
} else {
buffer.WriteString(strings.TrimSpace(err.Error()) + "\n")
}
buffer.WriteString("\n")
buffer.WriteString(helpers.ANSI_YELLOW + "*********" + helpers.ANSI_RESET + "\n")
buffer.WriteString("\n")
io.Copy(s.Trace, &buffer)
return err
}