executors/kubernetes/executor_kubernetes.go (470 lines of code) (raw):
package kubernetes
import (
"encoding/json"
"fmt"
"regexp"
"strings"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/credentialprovider"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/executors"
)
var (
executorOptions = executors.ExecutorOptions{
SharedBuildsDir: false,
Shell: common.ShellScriptInfo{
Shell: "bash",
Type: common.NormalShell,
RunnerCommand: "/usr/bin/gitlab-runner-helper",
},
ShowHostname: true,
}
)
type kubernetesOptions struct {
Image common.Image
Services common.Services
}
type executor struct {
executors.AbstractExecutor
kubeClient *client.Client
pod *api.Pod
credentials *api.Secret
options *kubernetesOptions
namespaceOverwrite string
serviceAccountOverwrite string
buildLimits api.ResourceList
serviceLimits api.ResourceList
helperLimits api.ResourceList
buildRequests api.ResourceList
serviceRequests api.ResourceList
helperRequests api.ResourceList
pullPolicy common.KubernetesPullPolicy
}
func (s *executor) setupResources() error {
var err error
// Limit
if s.buildLimits, err = limits(s.Config.Kubernetes.CPULimit, s.Config.Kubernetes.MemoryLimit); err != nil {
return fmt.Errorf("invalid build limits specified: %s", err.Error())
}
if s.serviceLimits, err = limits(s.Config.Kubernetes.ServiceCPULimit, s.Config.Kubernetes.ServiceMemoryLimit); err != nil {
return fmt.Errorf("invalid service limits specified: %s", err.Error())
}
if s.helperLimits, err = limits(s.Config.Kubernetes.HelperCPULimit, s.Config.Kubernetes.HelperMemoryLimit); err != nil {
return fmt.Errorf("invalid helper limits specified: %s", err.Error())
}
// Requests
if s.buildRequests, err = limits(s.Config.Kubernetes.CPURequest, s.Config.Kubernetes.MemoryRequest); err != nil {
return fmt.Errorf("invalid build requests specified: %s", err.Error())
}
if s.serviceRequests, err = limits(s.Config.Kubernetes.ServiceCPURequest, s.Config.Kubernetes.ServiceMemoryRequest); err != nil {
return fmt.Errorf("invalid service requests specified: %s", err.Error())
}
if s.helperRequests, err = limits(s.Config.Kubernetes.HelperCPURequest, s.Config.Kubernetes.HelperMemoryRequest); err != nil {
return fmt.Errorf("invalid helper requests specified: %s", err.Error())
}
return nil
}
func (s *executor) Prepare(options common.ExecutorPrepareOptions) (err error) {
if err = s.AbstractExecutor.Prepare(options); err != nil {
return err
}
if s.BuildShell.PassFile {
return fmt.Errorf("kubernetes doesn't support shells that require script file")
}
if s.kubeClient, err = getKubeClient(options.Config.Kubernetes); err != nil {
return fmt.Errorf("error connecting to Kubernetes: %s", err.Error())
}
if err = s.setupResources(); err != nil {
return err
}
if s.pullPolicy, err = s.Config.Kubernetes.PullPolicy.Get(); err != nil {
return err
}
if err = s.overwriteNamespace(options.Build); err != nil {
return err
}
if err = s.overwriteServiceAccount(options.Build); err != nil {
return err
}
s.prepareOptions(options.Build)
if err = s.checkDefaults(); err != nil {
return err
}
s.Println("Using Kubernetes executor with image", s.options.Image.Name, "...")
return nil
}
func (s *executor) Run(cmd common.ExecutorCommand) error {
s.Debugln("Starting Kubernetes command...")
if s.pod == nil {
err := s.setupCredentials()
if err != nil {
return err
}
err = s.setupBuildPod()
if err != nil {
return err
}
}
containerName := "build"
if cmd.Predefined {
containerName = "helper"
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
select {
case err := <-s.runInContainer(ctx, containerName, cmd.Script):
if err != nil && strings.Contains(err.Error(), "executing in Docker Container") {
return &common.BuildError{Inner: err}
}
return err
case <-cmd.Context.Done():
return fmt.Errorf("build aborted")
}
}
func (s *executor) Cleanup() {
if s.pod != nil {
err := s.kubeClient.Pods(s.pod.Namespace).Delete(s.pod.Name, nil)
if err != nil {
s.Errorln(fmt.Sprintf("Error cleaning up pod: %s", err.Error()))
}
}
if s.credentials != nil {
err := s.kubeClient.Secrets(s.pod.Namespace).Delete(s.credentials.Name)
if err != nil {
s.Errorln(fmt.Sprintf("Error cleaning up secrets: %s", err.Error()))
}
}
closeKubeClient(s.kubeClient)
s.AbstractExecutor.Cleanup()
}
func (s *executor) buildContainer(name, image string, imageDefinition common.Image, requests, limits api.ResourceList, command ...string) api.Container {
privileged := false
if s.Config.Kubernetes != nil {
privileged = s.Config.Kubernetes.Privileged
}
if len(command) == 0 && len(imageDefinition.Command) > 0 {
command = imageDefinition.Command
}
var args []string
if len(imageDefinition.Entrypoint) > 0 {
args = command
command = imageDefinition.Entrypoint
}
return api.Container{
Name: name,
Image: image,
ImagePullPolicy: api.PullPolicy(s.pullPolicy),
Command: command,
Args: args,
Env: buildVariables(s.Build.GetAllVariables().PublicOrInternal()),
Resources: api.ResourceRequirements{
Limits: limits,
Requests: requests,
},
VolumeMounts: s.getVolumeMounts(),
SecurityContext: &api.SecurityContext{
Privileged: &privileged,
},
Stdin: true,
}
}
func (s *executor) getVolumeMounts() (mounts []api.VolumeMount) {
path := strings.Split(s.Build.BuildDir, "/")
path = path[:len(path)-1]
mounts = append(mounts, api.VolumeMount{
Name: "repo",
MountPath: strings.Join(path, "/"),
})
for _, mount := range s.Config.Kubernetes.Volumes.HostPaths {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: mount.MountPath,
ReadOnly: mount.ReadOnly,
})
}
for _, mount := range s.Config.Kubernetes.Volumes.Secrets {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: mount.MountPath,
ReadOnly: mount.ReadOnly,
})
}
for _, mount := range s.Config.Kubernetes.Volumes.PVCs {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: mount.MountPath,
ReadOnly: mount.ReadOnly,
})
}
for _, mount := range s.Config.Kubernetes.Volumes.ConfigMaps {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: mount.MountPath,
ReadOnly: mount.ReadOnly,
})
}
return
}
func (s *executor) getVolumes() (volumes []api.Volume) {
volumes = append(volumes, api.Volume{
Name: "repo",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
})
for _, volume := range s.Config.Kubernetes.Volumes.HostPaths {
path := volume.HostPath
// Make backward compatible with syntax introduced in version 9.3.0
if path == "" {
path = volume.MountPath
}
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: path,
},
},
})
}
for _, volume := range s.Config.Kubernetes.Volumes.Secrets {
items := []api.KeyToPath{}
for key, path := range volume.Items {
items = append(items, api.KeyToPath{Key: key, Path: path})
}
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
Secret: &api.SecretVolumeSource{
SecretName: volume.Name,
Items: items,
},
},
})
}
for _, volume := range s.Config.Kubernetes.Volumes.PVCs {
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{
ClaimName: volume.Name,
ReadOnly: volume.ReadOnly,
},
},
})
}
for _, volume := range s.Config.Kubernetes.Volumes.ConfigMaps {
items := []api.KeyToPath{}
for key, path := range volume.Items {
items = append(items, api.KeyToPath{Key: key, Path: path})
}
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
ConfigMap: &api.ConfigMapVolumeSource{
LocalObjectReference: api.LocalObjectReference{
Name: volume.Name,
},
Items: items,
},
},
})
}
return
}
func (s *executor) setupCredentials() error {
authConfigs := make(map[string]credentialprovider.DockerConfigEntry)
for _, credentials := range s.Build.Credentials {
if credentials.Type != "registry" {
continue
}
authConfigs[credentials.URL] = credentialprovider.DockerConfigEntry{
Username: credentials.Username,
Password: credentials.Password,
}
}
if len(authConfigs) == 0 {
return nil
}
dockerCfgContent, err := json.Marshal(authConfigs)
if err != nil {
return err
}
secret := api.Secret{}
secret.GenerateName = s.Build.ProjectUniqueName()
secret.Namespace = s.Config.Kubernetes.Namespace
secret.Type = api.SecretTypeDockercfg
secret.Data = map[string][]byte{}
secret.Data[api.DockerConfigKey] = dockerCfgContent
s.credentials, err = s.kubeClient.Secrets(s.Config.Kubernetes.Namespace).Create(&secret)
if err != nil {
return err
}
return nil
}
func (s *executor) setupBuildPod() error {
services := make([]api.Container, len(s.options.Services))
for i, service := range s.options.Services {
resolvedImage := s.Build.GetAllVariables().ExpandValue(service.Name)
services[i] = s.buildContainer(fmt.Sprintf("svc-%d", i), resolvedImage, service, s.serviceRequests, s.serviceLimits)
}
labels := make(map[string]string)
for k, v := range s.Build.Runner.Kubernetes.PodLabels {
labels[k] = s.Build.Variables.ExpandValue(v)
}
var imagePullSecrets []api.LocalObjectReference
for _, imagePullSecret := range s.Config.Kubernetes.ImagePullSecrets {
imagePullSecrets = append(imagePullSecrets, api.LocalObjectReference{Name: imagePullSecret})
}
if s.credentials != nil {
imagePullSecrets = append(imagePullSecrets, api.LocalObjectReference{Name: s.credentials.Name})
}
buildImage := s.Build.GetAllVariables().ExpandValue(s.options.Image.Name)
pod, err := s.kubeClient.Pods(s.Config.Kubernetes.Namespace).Create(&api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: s.Build.ProjectUniqueName(),
Namespace: s.Config.Kubernetes.Namespace,
Labels: labels,
},
Spec: api.PodSpec{
Volumes: s.getVolumes(),
ServiceAccountName: s.Config.Kubernetes.ServiceAccount,
RestartPolicy: api.RestartPolicyNever,
NodeSelector: s.Config.Kubernetes.NodeSelector,
Containers: append([]api.Container{
// TODO use the build and helper template here
s.buildContainer("build", buildImage, s.options.Image, s.buildRequests, s.buildLimits, s.BuildShell.DockerCommand...),
s.buildContainer("helper", s.Config.Kubernetes.GetHelperImage(), common.Image{}, s.helperRequests, s.helperLimits, s.BuildShell.DockerCommand...),
}, services...),
TerminationGracePeriodSeconds: &s.Config.Kubernetes.TerminationGracePeriodSeconds,
ImagePullSecrets: imagePullSecrets,
},
})
if err != nil {
return err
}
s.pod = pod
return nil
}
func (s *executor) runInContainer(ctx context.Context, name, command string) <-chan error {
errc := make(chan error, 1)
go func() {
defer close(errc)
status, err := waitForPodRunning(ctx, s.kubeClient, s.pod, s.Trace, s.Config.Kubernetes)
if err != nil {
errc <- err
return
}
if status != api.PodRunning {
errc <- fmt.Errorf("pod failed to enter running state: %s", status)
return
}
config, err := getKubeClientConfig(s.Config.Kubernetes)
if err != nil {
errc <- err
return
}
exec := ExecOptions{
PodName: s.pod.Name,
Namespace: s.pod.Namespace,
ContainerName: name,
Command: s.BuildShell.DockerCommand,
In: strings.NewReader(command),
Out: s.Trace,
Err: s.Trace,
Stdin: true,
Config: config,
Client: s.kubeClient,
Executor: &DefaultRemoteExecutor{},
}
errc <- exec.Run()
}()
return errc
}
func (s *executor) prepareOptions(job *common.Build) {
s.options = &kubernetesOptions{}
s.options.Image = job.Image
for _, service := range job.Services {
if service.Name == "" {
continue
}
s.options.Services = append(s.options.Services, service)
}
}
// checkDefaults Defines the configuration for the Pod on Kubernetes
func (s *executor) checkDefaults() error {
if s.options.Image.Name == "" {
if s.Config.Kubernetes.Image == "" {
return fmt.Errorf("no image specified and no default set in config")
}
s.options.Image = common.Image{
Name: s.Config.Kubernetes.Image,
}
}
if s.Config.Kubernetes.Namespace == "" {
s.Warningln("Namespace is empty, therefore assuming 'default'.")
s.Config.Kubernetes.Namespace = "default"
}
s.Println("Using Kubernetes namespace:", s.Config.Kubernetes.Namespace)
return nil
}
// overwriteNamespace checks for variable in order to overwrite the configured
// namespace, as long as it complies to validation regular-expression, when
// expression is empty the overwrite is disabled.
func (s *executor) overwriteNamespace(job *common.Build) error {
if s.Config.Kubernetes.NamespaceOverwriteAllowed == "" {
s.Debugln("Configuration entry 'namespace_overwrite_allowed' is empty, using configured namespace.")
return nil
}
// looking for namespace overwrite variable, and expanding for interpolation
s.namespaceOverwrite = job.Variables.Expand().Get("KUBERNETES_NAMESPACE_OVERWRITE")
if s.namespaceOverwrite == "" {
return nil
}
if err := overwriteRegexCheck(s.Config.Kubernetes.NamespaceOverwriteAllowed, s.namespaceOverwrite); err != nil {
return err
}
s.Println("Overwritting configured namespace, from", s.Config.Kubernetes.Namespace, "to", s.namespaceOverwrite)
s.Config.Kubernetes.Namespace = s.namespaceOverwrite
return nil
}
// overwriteSercviceAccount checks for variable in order to overwrite the configured
// service account, as long as it complies to validation regular-expression, when
// expression is empty the overwrite is disabled.
func (s *executor) overwriteServiceAccount(job *common.Build) error {
if s.Config.Kubernetes.ServiceAccountOverwriteAllowed == "" {
s.Debugln("Configuration entry 'service_account_overwrite_allowed' is empty, disabling override.")
return nil
}
s.serviceAccountOverwrite = job.Variables.Expand().Get("KUBERNETES_SERVICE_ACCOUNT_OVERWRITE")
if s.serviceAccountOverwrite == "" {
return nil
}
if err := overwriteRegexCheck(s.Config.Kubernetes.ServiceAccountOverwriteAllowed, s.serviceAccountOverwrite); err != nil {
return err
}
s.Println("Overwritting configured ServiceAccount, from", s.Config.Kubernetes.ServiceAccount, "to", s.serviceAccountOverwrite)
s.Config.Kubernetes.ServiceAccount = s.serviceAccountOverwrite
return nil
}
//overwriteRegexCheck check if the regex provided for overwriting a config field matches the
//paramether provided, returns error if doesn't match
func overwriteRegexCheck(regex, value string) error {
var err error
var r *regexp.Regexp
if r, err = regexp.Compile(regex); err != nil {
return err
}
if match := r.MatchString(value); !match {
return fmt.Errorf("Provided value %s does not match regex %s", value, regex)
}
return nil
}
func createFn() common.Executor {
return &executor{
AbstractExecutor: executors.AbstractExecutor{
ExecutorOptions: executorOptions,
},
}
}
func featuresFn(features *common.FeaturesInfo) {
features.Variables = true
features.Image = true
features.Services = true
features.Artifacts = true
features.Cache = true
}
func init() {
common.RegisterExecutor("kubernetes", executors.DefaultExecutorProvider{
Creator: createFn,
FeaturesUpdater: featuresFn,
})
}