executors/kubernetes/kubernetes.go (2,555 lines of code) (raw):
package kubernetes
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"maps"
"math/rand"
"net/http"
"path"
"path/filepath"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"syscall"
"text/tabwriter"
"time"
"github.com/docker/cli/cli/config/types"
jsonpatch "github.com/evanphx/json-patch"
"github.com/jpillora/backoff"
api "k8s.io/api/core/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth" // Register all available authentication methods
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/exec"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/common/buildlogger"
"gitlab.com/gitlab-org/gitlab-runner/executors"
"gitlab.com/gitlab-org/gitlab-runner/executors/kubernetes/internal/pull"
"gitlab.com/gitlab-org/gitlab-runner/executors/kubernetes/internal/watchers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/container/helperimage"
"gitlab.com/gitlab-org/gitlab-runner/helpers/dns"
"gitlab.com/gitlab-org/gitlab-runner/helpers/docker/auth"
"gitlab.com/gitlab-org/gitlab-runner/helpers/featureflags"
os_helpers "gitlab.com/gitlab-org/gitlab-runner/helpers/os"
"gitlab.com/gitlab-org/gitlab-runner/helpers/pull_policies"
"gitlab.com/gitlab-org/gitlab-runner/helpers/retry"
service_helpers "gitlab.com/gitlab-org/gitlab-runner/helpers/service"
"gitlab.com/gitlab-org/gitlab-runner/session/proxy"
"gitlab.com/gitlab-org/gitlab-runner/shells"
)
const (
buildContainerName = "build"
helperContainerName = "helper"
initPermissionContainerName = "init-permissions"
detectShellScriptName = "detect_shell_script"
pwshJSONTerminationScriptName = "terminate_with_json_script"
waitLogFileTimeout = time.Minute
outputLogFileNotExistsExitCode = 100
unknownLogProcessorExitCode = 1000
// nodeSelectorWindowsBuildLabel is the label used to reference a specific Windows Version.
// https://kubernetes.io/docs/reference/labels-annotations-taints/#nodekubernetesiowindows-build
nodeSelectorWindowsBuildLabel = "node.kubernetes.io/windows-build"
apiVersion = "v1"
ownerReferenceKind = "Pod"
// Polling time between each attempt to check serviceAccount and imagePullSecret (in seconds)
resourceAvailabilityCheckMaxPollInterval = 5 * time.Second
serviceContainerPrefix = "svc-"
// runnerLabelNamespace is used to build the k8s objects' labels and annotations, and for checking if user-defined
// labels and label overwrites are allowed; i.e. labels within this namespace cannot be set or overwritten by
// users, though other labels can.
runnerLabelNamespace = "runner.gitlab.com"
// The suffix is built using alphanumeric character
// that means there is 34^8 possibilities for a resource name using the same pattern
// Considering that the k8s resources are deleted after they run,
k8sResourcesNameSuffixLength = 8
k8sResourcesNameMaxLength = 63
k8sEventWarningType = "Warning"
// errorAlreadyExistsMessage is an error message that is encountered when
// we fail to create a resource because it already exists.
// Because of a connectivity issue, an attempt to create a resource can fail while the request itself
// was successfully executed. We then monitor the conflict error message to retrieve the already create resource
errorAlreadyExistsMessage = "the server was not able to generate a unique name for the object"
)
var (
runnerLabelNamespacePattern = regexp.MustCompile(`(?i)(^|.*\.)` + regexp.QuoteMeta(runnerLabelNamespace) + `(\/.*|$)`)
PropagationPolicy = metav1.DeletePropagationBackground
executorOptions = executors.ExecutorOptions{
DefaultCustomBuildsDirEnabled: true,
DefaultSafeDirectoryCheckout: true,
DefaultBuildsDir: "/builds",
DefaultCacheDir: "/cache",
Shell: common.ShellScriptInfo{
Shell: "bash",
Type: common.NormalShell,
RunnerCommand: "/usr/bin/gitlab-runner-helper",
},
ShowHostname: true,
}
errIncorrectShellType = fmt.Errorf("kubernetes executor incorrect shell type")
DefaultResourceIdentifier = "default"
resourceTypeServiceAccount = "ServiceAccount"
resourceTypePullSecret = "ImagePullSecret"
defaultLogsBaseDir = "/logs"
defaultScriptsBaseDir = "/scripts"
chars = []rune("abcdefghijklmnopqrstuvwxyz0123456789")
// network errors to retry on
// make sure to update the documentation in docs/executors/kubernetes/_index.md to keep it in sync
retryNetworkErrorsGroup = []string{
"error dialing backend",
"TLS handshake timeout",
"read: connection timed out",
"connect: connection timed out",
"Timeout occurred",
"http2: client connection lost",
"connection refused",
"tls: internal error",
io.ErrUnexpectedEOF.Error(),
syscall.ECONNRESET.Error(),
syscall.ECONNREFUSED.Error(),
syscall.ECONNABORTED.Error(),
syscall.EPIPE.Error(),
}
)
type commandTerminatedError struct {
exitCode int
}
func (c *commandTerminatedError) Error() string {
return fmt.Sprintf("command terminated with exit code %d", c.exitCode)
}
func (c *commandTerminatedError) Is(err error) bool {
_, ok := err.(*commandTerminatedError)
return ok
}
func (s *executor) NewRetry() *retry.Retry {
retryLimits := s.Config.Kubernetes.RequestRetryLimits
retryBackoffConfig := s.getRetryBackoffConfig()
return retry.New().
WithCheck(func(_ int, err error) bool {
for key := range retryLimits {
if err != nil && strings.Contains(err.Error(), key) {
return true
}
}
return slices.ContainsFunc(retryNetworkErrorsGroup, func(v string) bool {
return err != nil && strings.Contains(err.Error(), v)
})
}).
WithMaxTriesFunc(func(err error) int {
for key, limit := range retryLimits {
if err != nil && strings.Contains(err.Error(), key) {
return limit
}
}
return s.Config.Kubernetes.RequestRetryLimit.Get()
}).
WithBackoff(retryBackoffConfig.min, retryBackoffConfig.max)
}
type retryBackoffConfig struct {
min time.Duration
max time.Duration
}
func (s *executor) getRetryBackoffConfig() retryBackoffConfig {
return retryBackoffConfig{min: common.RequestRetryBackoffMin, max: s.Config.Kubernetes.RequestRetryBackoffMax.Get()}
}
type podPhaseError struct {
name string
phase api.PodPhase
}
func (p *podPhaseError) Error() string {
return fmt.Sprintf("pod %q status is %q", p.name, p.phase)
}
type resourceCheckError struct {
resourceType string
resourceName string
}
func (r *resourceCheckError) Error() string {
return fmt.Sprintf(
"Timed out while waiting for %s/%s to be present in the cluster",
r.resourceType,
r.resourceName,
)
}
func (r *resourceCheckError) Is(err error) bool {
_, ok := err.(*resourceCheckError)
return ok
}
type podContainerError struct {
containerName string
exitCode int
reason string
}
func (p *podContainerError) Error() string {
return fmt.Sprintf("Error in container %s: exit code: %d, reason: '%s'", p.containerName, p.exitCode, p.reason)
}
type kubernetesOptions struct {
Image common.Image
Services map[string]*common.Image
}
func (kOpts kubernetesOptions) servicesList() common.Services {
services := make(common.Services, len(kOpts.Services))
for _, name := range slices.Sorted(maps.Keys(kOpts.Services)) {
services = append(services, *kOpts.Services[name])
}
return services
}
func (kOpts kubernetesOptions) getSortedServiceNames() []string {
return slices.Sorted(maps.Keys(kOpts.Services))
}
type containerBuildOpts struct {
name string
image string
imageDefinition common.Image
isServiceContainer bool
requests api.ResourceList
limits api.ResourceList
securityContext *api.SecurityContext
command []string
}
type podConfigPrepareOpts struct {
labels map[string]string
annotations map[string]string
services []api.Container
initContainers []api.Container
imagePullSecrets []api.LocalObjectReference
hostAliases []api.HostAlias
}
type executor struct {
executors.AbstractExecutor
newKubeClient func(config *restclient.Config) (kubernetes.Interface, error)
kubeClient kubernetes.Interface
getKubeConfig func(conf *common.KubernetesConfig, overwrites *overwrites) (*restclient.Config, error)
kubeConfig *restclient.Config
windowsKernelVersion func() string
pod *api.Pod
credentials *api.Secret
options *kubernetesOptions
services []api.Service
configurationOverwrites *overwrites
pullManager pull.Manager
helperImageInfo helperimage.Info
featureChecker featureChecker
newLogProcessor func() logProcessor
remoteProcessTerminated chan shells.StageCommandStatus
requireSharedBuildsDir *bool
// Flag if a repo mount and emptyDir volume are needed
requireDefaultBuildsDirVolume *bool
remoteStageStatusMutex sync.Mutex
remoteStageStatus shells.StageCommandStatus
eventsStream watch.Interface
podWatcher podWatcher
newPodWatcher func(podWatcherConfig) podWatcher
}
type podWatcher interface {
Start() error
UpdatePodName(string)
Stop()
Errors() <-chan error
}
// podWatcherConfig is configuration for setup of a new pod watcher
type podWatcherConfig struct {
ctx context.Context
logger *buildlogger.Logger
kubeClient kubernetes.Interface
featureChecker featureChecker
namespace string
labels map[string]string
maxSyncDuration time.Duration
retryProvider retry.Provider
}
type serviceCreateResponse struct {
service *api.Service
err error
}
func (s *executor) Prepare(options common.ExecutorPrepareOptions) (err error) {
s.AbstractExecutor.PrepareConfiguration(options)
if err = s.prepareOverwrites(options.Build.GetAllVariables()); err != nil {
return fmt.Errorf("couldn't prepare overwrites: %w", err)
}
s.prepareOptions(options.Build)
if err = s.prepareServiceOverwrites(s.options.Services); err != nil {
return fmt.Errorf("couldn't prepare explicit service overwrites: %w", err)
}
// Dynamically configure use of shared build dir allowing
// for static build dir when isolated volume is in use.
s.SharedBuildsDir = s.isSharedBuildsDirRequired()
if err = s.checkDefaults(); err != nil {
return fmt.Errorf("check defaults error: %w", err)
}
s.kubeConfig, err = s.getKubeConfig(s.Config.Kubernetes, s.configurationOverwrites)
if err != nil {
return fmt.Errorf("getting Kubernetes config: %w", err)
}
s.kubeClient, err = s.newKubeClient(s.kubeConfig)
if err != nil {
return fmt.Errorf("connecting to Kubernetes: %w", err)
}
s.helperImageInfo, err = s.prepareHelperImage()
if err != nil {
return fmt.Errorf("prepare helper image: %w", err)
}
// setup default executor options based on OS type
s.setupDefaultExecutorOptions(s.helperImageInfo.OSType)
s.featureChecker = &kubeClientFeatureChecker{s.kubeClient}
imageName := s.options.Image.Name
s.BuildLogger.Println("Using Kubernetes executor with image", imageName, "...")
if !s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
s.BuildLogger.Println("Using attach strategy to execute scripts...")
}
// pull manager can be prepared once s.options.Image & s.options.Services is set up
s.pullManager, err = s.preparePullManager()
if err != nil {
return err
}
s.BuildLogger.Debugln(fmt.Sprintf("Using helper image: %s:%s", s.helperImageInfo.Name, s.helperImageInfo.Tag))
if err = s.AbstractExecutor.PrepareBuildAndShell(); err != nil {
return fmt.Errorf("prepare build and shell: %w", err)
}
if s.BuildShell.PassFile {
return fmt.Errorf("kubernetes doesn't support shells that require script file")
}
s.podWatcher = s.newPodWatcher(podWatcherConfig{
ctx: options.Context,
logger: &s.BuildLogger,
kubeClient: s.kubeClient,
featureChecker: s.featureChecker,
namespace: s.configurationOverwrites.namespace,
labels: s.buildLabels(),
maxSyncDuration: s.Config.Kubernetes.RequestRetryBackoffMax.Get(),
retryProvider: s,
})
if err := s.podWatcher.Start(); err != nil {
return fmt.Errorf("starting pod watcher: %w", err)
}
return s.waitForServices(options.Context)
}
func (s *executor) preparePullManager() (pull.Manager, error) {
allowedPullPolicies, err := s.Config.Kubernetes.GetAllowedPullPolicies()
if err != nil {
return nil, err
}
dockerPullPoliciesPerContainer := map[string][]common.DockerPullPolicy{
buildContainerName: s.options.Image.PullPolicies,
helperContainerName: s.options.Image.PullPolicies,
initPermissionContainerName: s.options.Image.PullPolicies,
}
for containerName, service := range s.options.Services {
dockerPullPoliciesPerContainer[containerName] = service.PullPolicies
}
k8sPullPoliciesPerContainer := map[string][]api.PullPolicy{}
for containerName, pullPolicies := range dockerPullPoliciesPerContainer {
k8sPullPolicies, err := s.getPullPolicies(pullPolicies)
if err != nil {
return nil, fmt.Errorf("converting pull policy for container %q: %w", containerName, err)
}
k8sPullPolicies, err = pull_policies.ComputeEffectivePullPolicies(
k8sPullPolicies, allowedPullPolicies, pullPolicies, s.Config.Kubernetes.PullPolicy)
if err != nil {
return nil, &common.BuildError{
Inner: fmt.Errorf("invalid pull policy for container %q: %w", containerName, err),
FailureReason: common.ConfigurationError,
}
}
s.BuildLogger.Println(fmt.Sprintf("Using effective pull policy of %s for container %s", k8sPullPolicies, containerName))
k8sPullPoliciesPerContainer[containerName] = k8sPullPolicies
}
return pull.NewPullManager(k8sPullPoliciesPerContainer, &s.BuildLogger), nil
}
// getPullPolicies selects the pull_policy configurations originating from
// either gitlab-ci.yaml or config.toml. If present, the pull_policies in
// gitlab-ci.yaml take precedence over those in config.toml.
func (s *executor) getPullPolicies(imagePullPolicies []common.DockerPullPolicy) ([]api.PullPolicy, error) {
k8sImagePullPolicies, err := s.Config.Kubernetes.ConvertFromDockerPullPolicy(imagePullPolicies)
if err != nil {
return nil, fmt.Errorf("conversion to Kubernetes policy: %w", err)
}
if len(k8sImagePullPolicies) != 0 {
return k8sImagePullPolicies, nil
}
return s.Config.Kubernetes.GetPullPolicies()
}
func (s *executor) setupDefaultExecutorOptions(os string) {
if os == helperimage.OSTypeWindows {
s.DefaultBuildsDir = `C:\builds`
s.DefaultCacheDir = `C:\cache`
s.ExecutorOptions.Shell.Shell = shells.SNPowershell
s.ExecutorOptions.Shell.RunnerCommand = "gitlab-runner-helper"
}
}
func (s *executor) prepareHelperImage() (helperimage.Info, error) {
config := s.retrieveHelperImageConfig()
// use node selector labels to better select the correct image
if s.Config.Kubernetes.NodeSelector != nil {
for label, option := range map[string]*string{
api.LabelArchStable: &config.Architecture,
api.LabelOSStable: &config.OSType,
nodeSelectorWindowsBuildLabel: &config.KernelVersion,
} {
value := s.Config.Kubernetes.NodeSelector[label]
if value != "" {
*option = value
}
}
}
// Also consider node selector overwrites as they may change arch or os
if s.configurationOverwrites.nodeSelector != nil {
for label, option := range map[string]*string{
api.LabelArchStable: &config.Architecture,
api.LabelOSStable: &config.OSType,
nodeSelectorWindowsBuildLabel: &config.KernelVersion,
} {
value := s.configurationOverwrites.nodeSelector[label]
if value != "" {
*option = value
}
}
}
return helperimage.Get(common.AppVersion.Version, config)
}
func (s *executor) retrieveHelperImageConfig() helperimage.Config {
cfg := helperimage.Config{
OSType: helperimage.OSTypeLinux,
Architecture: "amd64",
Shell: s.Config.Shell,
Flavor: s.ExpandValue(s.Config.Kubernetes.HelperImageFlavor),
ProxyExec: s.Config.IsProxyExec(),
DisableUmask: s.Build.IsFeatureFlagOn(featureflags.DisableUmaskForKubernetesExecutor),
}
if !s.Config.Kubernetes.HelperImageAutosetArchAndOS {
return cfg
}
cfg.Architecture = common.AppVersion.Architecture
if helperimage.OSTypeWindows == common.AppVersion.OS {
cfg.OSType = helperimage.OSTypeWindows
cfg.KernelVersion = s.windowsKernelVersion()
}
return cfg
}
func (s *executor) Run(cmd common.ExecutorCommand) error {
for attempt := 1; ; attempt++ {
var err error
if s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
s.BuildLogger.Debugln("Starting Kubernetes command...")
err = s.runWithExecLegacy(cmd)
} else {
s.BuildLogger.Debugln("Starting Kubernetes command with attach...")
err = s.runWithAttach(cmd)
}
if err != nil && s.Config.Kubernetes.GetPrintPodWarningEvents() {
s.logPodWarningEvents(cmd.Context, k8sEventWarningType)
}
var imagePullErr *pull.ImagePullError
if errors.As(err, &imagePullErr) {
if s.pullManager.UpdatePolicyForContainer(attempt, imagePullErr) {
s.cleanupResources()
s.pod = nil
continue
}
}
return err
}
}
func (s *executor) handlePodEvents() error {
// This will run the watcher only for the first call
// which is when the pod is being initialized
if s.eventsStream != nil {
return nil
}
if err := s.watchPodEvents(); err != nil {
return err
}
go s.printPodEvents()
return nil
}
func (s *executor) watchPodEvents() error {
s.BuildLogger.Println("Subscribing to Kubernetes Pod events...")
// Continue polling for the status of the pod as that feels more straightforward than
// checking for each individual container's status in the events.
// It also makes it less likely to break something existing since we get the status of the Pod
// when it's already failed.
// This strategy can be revised in the future if needed.
var err error
s.eventsStream, err = retry.WithValueFn(s, func() (watch.Interface, error) {
// TODO: handle the context properly with https://gitlab.com/gitlab-org/gitlab-runner/-/issues/27932
// kubeAPI: events, watch, FF_PRINT_POD_EVENTS=true
return s.kubeClient.CoreV1().Events(s.pod.Namespace).Watch(context.Background(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s", s.pod.Name),
})
}).Run()
return err
}
func (s *executor) printPodEvents() {
wc := s.BuildLogger.Stream(buildlogger.StreamExecutorLevel, buildlogger.Stderr)
defer wc.Close()
w := tabwriter.NewWriter(wc, 3, 1, 3, ' ', 0)
_, _ = fmt.Fprintln(w, "Type\tReason\tMessage")
// The s.eventsStream.Stop method will be called by the caller
// that's how we'll exit from this loop
for result := range s.eventsStream.ResultChan() {
ev, ok := result.Object.(*api.Event)
if !ok {
continue
}
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\n", ev.Type, ev.Reason, ev.Message)
_ = w.Flush()
}
}
func (s *executor) logPodWarningEvents(ctx context.Context, eventType string) {
if s.pod == nil {
return
}
events, err := retry.WithValueFn(s, func() (*api.EventList, error) {
//nolint:gocritic
// kubeAPI: events, list, print_pod_warning_events=true
return s.kubeClient.CoreV1().Events(s.pod.Namespace).
List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s,type=%s", s.pod.Name, eventType),
})
}).Run()
if err != nil {
s.BuildLogger.Debugln(fmt.Sprintf("Error retrieving events list: %s", err.Error()))
return
}
for _, event := range events.Items {
s.BuildLogger.Warningln(fmt.Sprintf("Event retrieved from the cluster: %s", event.Message))
}
}
func (s *executor) runWithExecLegacy(cmd common.ExecutorCommand) error {
ctx := cmd.Context
if err := s.setupPodLegacy(ctx); err != nil {
return err
}
containerName := buildContainerName
containerCommand := s.BuildShell.DockerCommand
if cmd.Predefined {
containerName = helperContainerName
containerCommand = s.helperImageInfo.Cmd
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
s.BuildLogger.Debugln(fmt.Sprintf(
"Starting in container %q the command %q with script: %s",
containerName,
containerCommand,
cmd.Script,
))
stdout, stderr := s.getExecutorIoWriters()
defer stdout.Close()
defer stderr.Close()
select {
case err := <-s.runInContainerWithExec(ctx, containerName, containerCommand, cmd.Script, stdout, stderr):
s.BuildLogger.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
var exitError exec.CodeExitError
if err != nil && errors.As(err, &exitError) {
return &common.BuildError{Inner: err, ExitCode: exitError.ExitStatus()}
}
return err
case err := <-s.podWatcher.Errors():
// if we observe terminal pod errors via the pod watcher, we can exit immediately
return err
case <-ctx.Done():
return fmt.Errorf("build aborted")
}
}
//nolint:gocognit
func (s *executor) setupPodLegacy(ctx context.Context) error {
if s.pod != nil {
return nil
}
err := s.setupBuildNamespace(ctx)
if err != nil {
return err
}
err = s.setupCredentials(ctx)
if err != nil {
return err
}
initContainers, err := s.buildInitContainers()
if err != nil {
return err
}
err = s.setupBuildPod(ctx, initContainers)
if err != nil {
return err
}
if s.Build.IsFeatureFlagOn(featureflags.PrintPodEvents) {
if err := s.handlePodEvents(); err != nil {
return err
}
}
if s.Build.IsFeatureFlagOn(featureflags.KubernetesHonorEntrypoint) {
err := s.captureContainerLogs(ctx, buildContainerName, &entrypointLogForwarder{
Sink: s.BuildLogger.Stream(buildlogger.StreamExecutorLevel, buildlogger.Stdout),
})
if err != nil {
return err
}
}
var out io.WriteCloser = buildlogger.NewNopCloser(io.Discard)
if !s.Build.IsFeatureFlagOn(featureflags.PrintPodEvents) {
out = s.BuildLogger.Stream(buildlogger.StreamExecutorLevel, buildlogger.Stderr)
defer out.Close()
}
if err := s.waitForPod(ctx, out); err != nil {
return err
}
return nil
}
func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
ctx := cmd.Context
err := s.ensurePodsConfigured(ctx)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
containerName, containerCommand := s.getContainerInfo(cmd)
err = s.saveScriptOnEmptyDir(ctx, s.scriptName(string(cmd.Stage)), containerName, cmd.Script)
if err != nil {
return err
}
s.BuildLogger.Debugln(fmt.Sprintf(
"Starting in container %q the command %q with script: %s",
containerName,
containerCommand,
cmd.Script,
))
podStatusCh := s.watchPodStatus(ctx, &podContainerStatusChecker{shouldCheckContainerFilter: isNotServiceContainer})
select {
case err := <-s.runInContainer(ctx, cmd.Stage, containerName, containerCommand):
s.BuildLogger.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
var terminatedError *commandTerminatedError
if err != nil && errors.As(err, &terminatedError) {
return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
}
return err
case err := <-podStatusCh:
if IsKubernetesPodNotFoundError(err) || IsKubernetesPodFailedError(err) || IsKubernetesPodContainerError(err) {
return err
}
return &common.BuildError{Inner: err}
case err := <-s.podWatcher.Errors():
// if we observe terminal pod errors via the pod watcher, we can exit immediately
return err
case <-ctx.Done():
s.remoteStageStatusMutex.Lock()
defer s.remoteStageStatusMutex.Unlock()
script := s.stageCancellationScript(string(cmd.Stage))
s.BuildLogger.Debugln("Running job cancellation script:", script)
if !s.remoteStageStatus.IsExited() {
err := <-s.runInContainerWithExec(
s.Context,
containerName,
s.BuildShell.DockerCommand,
script,
nil, nil,
)
s.BuildLogger.Debugln("Job cancellation script exited with error:", err)
}
return fmt.Errorf("build aborted")
}
}
func (s *executor) stageCancellationScript(stage string) string {
switch s.Shell().Shell {
case shells.SNPwsh, shells.SNPowershell:
processIdRetrievalCmd := fmt.Sprintf(
"(Get-CIMInstance Win32_Process -Filter \"CommandLine LIKE '%%%s%%'\").ProcessId",
s.scriptName(stage),
)
return shells.PowershellStageProcessesKillerScript(processIdRetrievalCmd)
default:
// ps command is not available on all unix-like OS
// To bypass this limitation, we use the following command to search for existing PIDs in the /proc directory
// Some post processing are then made to only display the process PID and the command line executed.
searchPIDs := "for prc in /proc/[0-9]*/cmdline; do (printf \"$prc \"; cat -A \"$prc\") | " +
"sed 's/\\^@/ /g;s|/proc/||;s|/cmdline||'; echo; done"
// a filtration is made to only keep those related to the ongoing stage script name
// The subprocess of each PIDs is also retrieve if any.
return "kill -TERM $(for item in $(" + searchPIDs + " | grep -e '" +
s.scriptName(stage) +
" $' | awk '{print $1}'); do test -f /proc/${item}/task/${item}/children" +
" && cat /proc/${item}/task/${item}/children && echo; done) 2> /dev/null"
}
}
func (s *executor) ensurePodsConfigured(ctx context.Context) error {
if s.pod != nil {
return nil
}
if err := s.setupBuildNamespace(ctx); err != nil {
return fmt.Errorf("setting up build namespace: %w", err)
}
if err := s.setupCredentials(ctx); err != nil {
return fmt.Errorf("setting up credentials: %w", err)
}
initContainers, err := s.buildInitContainers()
if err != nil {
return err
}
if err := s.setupBuildPod(ctx, initContainers); err != nil {
return fmt.Errorf("setting up build pod: %w", err)
}
if s.Build.IsFeatureFlagOn(featureflags.PrintPodEvents) {
if err := s.handlePodEvents(); err != nil {
return err
}
}
var out io.WriteCloser = buildlogger.NewNopCloser(io.Discard)
if !s.Build.IsFeatureFlagOn(featureflags.PrintPodEvents) {
out = s.BuildLogger.Stream(buildlogger.StreamExecutorLevel, buildlogger.Stderr)
}
defer out.Close()
if err := s.waitForPod(ctx, out); err != nil {
return err
}
out.Close()
if err := s.setupTrappingScripts(ctx); err != nil {
return fmt.Errorf("setting up trapping scripts on emptyDir: %w", err)
}
// start pulling in logs from the build container, to capture entrypoint logs
if s.Build.IsFeatureFlagOn(featureflags.KubernetesHonorEntrypoint) {
err := s.captureContainerLogs(ctx, buildContainerName, &entrypointLogForwarder{
Sink: s.BuildLogger.Stream(buildlogger.StreamExecutorLevel, buildlogger.Stdout),
})
if err != nil {
return err
}
}
// This starts the log processing, where we run the helper bin (in the helper container) to pull logs from the
// logfile.
go s.processLogs(ctx)
// This pulls the services containers logs directly from the kubeapi and pushes them into the buildlogger.
s.captureServiceContainersLogs(ctx, s.pod.Spec.Containers)
return nil
}
func (s *executor) buildInitContainers() ([]api.Container, error) {
var initContainers []api.Container
if s.Build.IsFeatureFlagOn(featureflags.DisableUmaskForKubernetesExecutor) {
uidGidCollectorInitContainer, err := s.buildUiGidCollector(s.helperImageInfo.OSType)
if err != nil {
return nil, fmt.Errorf("building umask init container: %w", err)
}
initContainers = append(initContainers, uidGidCollectorInitContainer)
}
if !s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) ||
(s.Build.IsFeatureFlagOn(featureflags.UseDumbInitWithKubernetesExecutor) &&
!s.isWindowsJob()) {
permissionsInitContainer, err := s.buildPermissionsInitContainer(s.helperImageInfo.OSType)
if err != nil {
return nil, fmt.Errorf("building permissions init container: %w", err)
}
initContainers = append(initContainers, permissionsInitContainer)
}
return initContainers, nil
}
func (s *executor) waitForPod(ctx context.Context, writer io.WriteCloser) error {
status, err := waitForPodRunning(ctx, s.kubeClient, s.pod, writer, s.Config.Kubernetes, buildContainerName)
if err != nil {
return fmt.Errorf("waiting for pod running: %w", err)
}
if status != api.PodRunning {
return fmt.Errorf("pod failed to enter running state: %s", status)
}
if !s.Build.IsFeatureFlagOn(featureflags.WaitForPodReachable) {
return nil
}
if err := WaitForPodReachable(ctx, s.kubeClient, s.pod, s.Config.Kubernetes); err != nil {
return fmt.Errorf("pod failed to become attachable %v", err)
}
return nil
}
func (s *executor) getContainerInfo(cmd common.ExecutorCommand) (string, []string) {
var containerCommand []string
containerName := buildContainerName
if cmd.Predefined {
containerName = helperContainerName
}
shell := s.Shell().Shell
switch shell {
case shells.SNPwsh, shells.SNPowershell:
// Translates to roughly "/path/to/parse_pwsh_script.ps1 /path/to/stage_script"
containerCommand = []string{
s.scriptPath(pwshJSONTerminationScriptName),
s.scriptPath(cmd.Stage),
s.buildRedirectionCmd(shell),
}
default:
// Translates to roughly "sh -c '(/detect/shell/path.sh /stage/script/path.sh 2>&1 | tee) &'"
// which when the detect shell exits becomes something like "bash /stage/script/path.sh".
// This works unlike "gitlab-runner-build" since the detect shell passes arguments with "$@"
containerCommand = []string{
"sh",
// We have to run the command in a background subshell. Unfortunately,
// explaining why in a comment fails the code quality check of
// function length not exceeding 60 lines, so `git blame` this instead.
"-c",
fmt.Sprintf("'(%s %s %s) &'",
s.scriptPath(detectShellScriptName),
s.scriptPath(cmd.Stage),
s.buildRedirectionCmd(shell),
),
}
if cmd.Predefined {
// We use redirection here since the "gitlab-runner-build" helper doesn't pass input args
// to the shell it executes, so we technically pass the script to the stdin of the underlying shell
// translates roughly to "gitlab-runner-build <<< /stage/script/path.sh"
containerCommand = append( //nolint:gocritic
s.helperImageInfo.Cmd,
"<<<",
s.scriptPath(cmd.Stage),
s.buildRedirectionCmd(shell),
)
}
}
return containerName, containerCommand
}
func (s *executor) initContainerResources() api.ResourceRequirements {
resources := api.ResourceRequirements{}
if s.configurationOverwrites != nil {
resources.Limits = s.configurationOverwrites.helperLimits
resources.Requests = s.configurationOverwrites.helperRequests
}
return resources
}
func (s *executor) buildPermissionsInitContainer(os string) (api.Container, error) {
pullPolicy, err := s.pullManager.GetPullPolicyFor(helperContainerName)
if err != nil {
return api.Container{}, fmt.Errorf("getting pull policy for permissions init container: %w", err)
}
container := api.Container{
Name: initPermissionContainerName,
Image: s.getHelperImage(),
VolumeMounts: s.getVolumeMounts(),
ImagePullPolicy: pullPolicy,
// let's use build container resources
Resources: s.initContainerResources(),
SecurityContext: s.Config.Kubernetes.GetContainerSecurityContext(
s.Config.Kubernetes.InitPermissionsContainerSecurityContext,
s.defaultCapDrop()...,
),
}
// The kubernetes executor uses both a helper container (for predefined stages) and a build
// container (for user defined steps). When accessing files on a shared volume, permissions
// are resolved within the context of the individual container.
//
// For Linux, the helper container and build container can occasionally have the same user IDs
// and access is not a problem. This can occur when:
// - the image defines a user ID that is identical across both images
// - PodSecurityContext is used and the UIDs is set manually
// - Openshift is used and each pod is assigned a different user ID
// Due to UIDs being different in other scenarios, we explicitly open the permissions on the
// log shared volume so both containers have access.
//
// For Windows, the security identifiers are larger. Unlike Linux, its not likely to have
// containers share the same identifier. The Windows Security Access Manager is not shared
// between containers, so we need to open up permissions across more than just the logging
// shared volume. Fortunately, Windows allows us to set permissions that recursively affect
// future folders and files.
switch os {
case helperimage.OSTypeWindows:
chmod := "icacls $ExecutionContext.SessionState.Path.GetUnresolvedProviderPathFromPSPath(%q) /grant 'Everyone:(OI)(CI)F' /q | out-null"
commands := []string{
fmt.Sprintf(chmod, s.logsDir()),
fmt.Sprintf(chmod, s.Build.RootDir),
}
container.Command = []string{s.Shell().Shell, "-c", strings.Join(commands, ";\n")}
default:
var initCommand []string
if !s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
initCommand = append(initCommand, fmt.Sprintf("touch %[1]s && (chmod 777 %[1]s || exit 0)", s.logFile()))
}
if s.Build.IsFeatureFlagOn(featureflags.UseDumbInitWithKubernetesExecutor) {
initCommand = append(initCommand, fmt.Sprintf("cp /usr/bin/dumb-init %s", s.scriptsDir()))
}
container.Command = []string{"sh", "-c", strings.Join(initCommand, ";\n")}
}
return container, nil
}
func (s *executor) buildUiGidCollector(os string) (api.Container, error) {
opts := containerBuildOpts{
name: "init-build-uid-gid-collector",
image: s.options.Image.Name,
imageDefinition: s.options.Image,
requests: s.configurationOverwrites.buildRequests,
limits: s.configurationOverwrites.buildLimits,
securityContext: s.Config.Kubernetes.GetContainerSecurityContext(
s.Config.Kubernetes.BuildContainerSecurityContext,
s.defaultCapDrop()...,
),
}
err := s.verifyAllowedImages(opts)
if err != nil {
return api.Container{}, err
}
pullPolicy, err := s.pullManager.GetPullPolicyFor(opts.name)
if err != nil {
return api.Container{}, err
}
container := api.Container{
Name: opts.name,
Image: opts.image,
VolumeMounts: s.getVolumeMounts(),
ImagePullPolicy: pullPolicy,
// let's use build container resources
Resources: api.ResourceRequirements{Limits: opts.limits, Requests: opts.requests},
SecurityContext: opts.securityContext,
}
if os == helperimage.OSTypeLinux {
container.Command = []string{"sh", "-c", fmt.Sprintf("> %s/%s", s.RootDir(), shells.BuildUidGidFile)}
}
return container, nil
}
func (s *executor) buildRedirectionCmd(shell string) string {
if shell == shells.SNPowershell {
// powershell outputs utf16, so we re-encode the output to utf8
// this is important because our json decoder that detects the exit status
// of a job requires utf8.Converting command output to strings with %{"$_"}
// prevents a powershell complaint about native command output on stderr.
return fmt.Sprintf("2>&1 | %%{ \"$_\" } | Out-File -Append -Encoding UTF8 %s", s.logFile())
}
return fmt.Sprintf("2>&1 | tee -a %s", s.logFile())
}
func (s *executor) processLogs(ctx context.Context) {
processor := s.newLogProcessor()
logsCh, errCh := processor.Process(ctx)
// todo: update kubernetes log processor to support separate stdout/stderr streams
logger := s.BuildLogger.Stream(buildlogger.StreamWorkLevel, buildlogger.Stdout)
defer logger.Close()
for {
select {
case line, ok := <-logsCh:
if !ok {
return
}
s.forwardLogLine(logger, line)
case err, ok := <-errCh:
if !ok {
continue
}
if err != nil {
s.BuildLogger.Warningln(fmt.Sprintf("Error processing the log file: %v", err))
}
exitCode := getExitCode(err)
// Script can be kept to nil as not being used after the exitStatus is received
s.remoteProcessTerminated <- shells.StageCommandStatus{CommandExitCode: &exitCode}
}
}
}
func (s *executor) forwardLogLine(w io.Writer, line string) {
var status shells.StageCommandStatus
if !status.TryUnmarshal(line) {
if _, err := w.Write([]byte(line)); err != nil {
s.BuildLogger.Warningln(fmt.Sprintf("Error writing log line to trace: %v", err))
}
return
}
s.BuildLogger.Debugln(fmt.Sprintf("Setting remote stage status: %s", status))
s.remoteStageStatusMutex.Lock()
s.remoteStageStatus = status
s.remoteStageStatusMutex.Unlock()
if status.IsExited() {
s.remoteProcessTerminated <- status
}
}
// getExitCode tries to extract the exit code from an inner exec.CodeExitError
// This error may be returned by the underlying kubernetes connection stream
// however it's not guaranteed to be.
// getExitCode would return unknownLogProcessorExitCode if err isn't of type exec.CodeExitError
// or if it's nil
func getExitCode(err error) int {
var exitErr exec.CodeExitError
if errors.As(err, &exitErr) {
return exitErr.Code
}
return unknownLogProcessorExitCode
}
func (s *executor) setupTrappingScripts(ctx context.Context) error {
s.BuildLogger.Debugln("Setting up trapping scripts on emptyDir ...")
scriptName, script := "", ""
shellName := s.Shell().Shell
switch shellName {
case shells.SNPwsh, shells.SNPowershell:
scriptName, script = s.scriptName(pwshJSONTerminationScriptName), shells.PwshJSONTerminationScript(shellName)
default:
scriptName, script = s.scriptName(detectShellScriptName), shells.BashDetectShellScript
}
return s.saveScriptOnEmptyDir(ctx, scriptName, buildContainerName, script)
}
func (s *executor) saveScriptOnEmptyDir(ctx context.Context, scriptName, containerName, script string) error {
shell, err := s.retrieveShell()
if err != nil {
return err
}
scriptPath := fmt.Sprintf("%s/%s", s.scriptsDir(), scriptName)
saveScript, err := shell.GenerateSaveScript(*s.Shell(), scriptPath, script)
if err != nil {
return err
}
s.BuildLogger.Debugln(fmt.Sprintf("Saving stage script %s on Container %q", saveScript, containerName))
stdout, stderr := s.getExecutorIoWriters()
defer stdout.Close()
defer stderr.Close()
select {
case err := <-s.runInContainerWithExec(
ctx,
containerName,
s.BuildShell.DockerCommand,
saveScript,
stdout,
stderr,
):
s.BuildLogger.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
var exitError exec.CodeExitError
if err != nil && errors.As(err, &exitError) {
return &common.BuildError{Inner: err, ExitCode: exitError.ExitStatus()}
}
return err
case <-ctx.Done():
return fmt.Errorf("build aborted")
}
}
func (s *executor) retrieveShell() (common.Shell, error) {
bashShell, ok := common.GetShell(s.Shell().Shell).(*shells.BashShell)
if ok {
return bashShell, nil
}
shell := common.GetShell(s.Shell().Shell)
if shell == nil {
return nil, errIncorrectShellType
}
return shell, nil
}
func (s *executor) Finish(err error) {
s.podWatcher.Stop()
if IsKubernetesPodNotFoundError(err) {
// Avoid an additional error message when trying to
// cleanup a pod that we know no longer exists
s.pod = nil
}
s.AbstractExecutor.Finish(err)
}
func (s *executor) Cleanup() {
if s.eventsStream != nil {
s.eventsStream.Stop()
}
s.cleanupResources()
closeKubeClient(s.kubeClient)
s.AbstractExecutor.Cleanup()
}
// cleanupResources deletes the resources used during the runner job
// Having a pod does not mean that the owner-dependent relationship exists as an error may occur during setting
// We therefore explicitly delete the resources if no ownerReference is found on it
// This does not apply for services as they are created with the owner from the start
// thus deletion of the pod automatically means deletion of the services if any
func (s *executor) cleanupResources() {
// Here we don't use the build context as its timeout will prevent a successful cleanup of the resources.
// The solution used here is inspired from the one used for the docker executor.
// We give a configurable timeout to complete the resources cleanup.
ctx, cancel := context.WithTimeout(context.Background(), s.Config.Kubernetes.GetCleanupResourcesTimeout())
defer cancel()
if s.pod != nil {
kubeRequest := retry.WithFn(s, func() error {
// kubeAPI: pods, delete
return s.kubeClient.CoreV1().
Pods(s.pod.Namespace).
Delete(ctx, s.pod.Name, metav1.DeleteOptions{
GracePeriodSeconds: s.Config.Kubernetes.CleanupGracePeriodSeconds,
PropagationPolicy: &PropagationPolicy,
})
})
if err := kubeRequest.Run(); err != nil {
s.BuildLogger.Errorln(fmt.Sprintf("Error cleaning up pod: %s", err.Error()))
}
}
if s.credentials != nil && len(s.credentials.OwnerReferences) == 0 {
kubeRequest := retry.WithFn(s, func() error {
// kubeAPI: secrets, delete
return s.kubeClient.CoreV1().
Secrets(s.configurationOverwrites.namespace).
Delete(ctx, s.credentials.Name, metav1.DeleteOptions{
GracePeriodSeconds: s.Config.Kubernetes.CleanupGracePeriodSeconds,
})
})
if err := kubeRequest.Run(); err != nil {
s.BuildLogger.Errorln(fmt.Sprintf("Error cleaning up secrets: %s", err.Error()))
}
}
err := s.teardownBuildNamespace(ctx)
if err != nil {
s.BuildLogger.Errorln(fmt.Sprintf("Error tearing down namespace: %s", err.Error()))
}
}
func (s *executor) buildContainer(opts containerBuildOpts) (api.Container, error) {
var envVars []common.JobVariable
if opts.isServiceContainer {
envVars = s.getServiceVariables(opts.imageDefinition)
} else if opts.name == buildContainerName {
envVars = s.Build.GetAllVariables().PublicOrInternal()
}
err := s.verifyAllowedImages(opts)
if err != nil {
return api.Container{}, err
}
containerPorts := make([]api.ContainerPort, len(opts.imageDefinition.Ports))
proxyPorts := make([]proxy.Port, len(opts.imageDefinition.Ports))
for i, port := range opts.imageDefinition.Ports {
proxyPorts[i] = proxy.Port{Name: port.Name, Number: port.Number, Protocol: port.Protocol}
containerPorts[i] = api.ContainerPort{ContainerPort: int32(port.Number)}
}
if len(proxyPorts) > 0 {
aliases := opts.imageDefinition.Aliases()
if len(aliases) == 0 {
if opts.name != buildContainerName {
aliases = []string{fmt.Sprintf("proxy-%s", opts.name)}
} else {
aliases = []string{opts.name}
}
}
for _, serviceName := range aliases {
s.ProxyPool[serviceName] = s.newProxy(serviceName, proxyPorts)
}
}
pullPolicy, err := s.pullManager.GetPullPolicyFor(opts.name)
if err != nil {
return api.Container{}, err
}
command, args := s.getCommandAndArgs(opts.imageDefinition, opts.command...)
container := api.Container{
Name: opts.name,
Image: opts.image,
ImagePullPolicy: pullPolicy,
Command: command,
Args: args,
Env: buildVariables(envVars),
Resources: api.ResourceRequirements{Limits: opts.limits, Requests: opts.requests},
Ports: containerPorts,
VolumeMounts: s.getVolumeMounts(),
SecurityContext: opts.securityContext,
Lifecycle: s.prepareLifecycleHooks(),
Stdin: true,
}
return container, nil
}
func (s *executor) verifyAllowedImages(opts containerBuildOpts) error {
// check if the image/service is allowed
internalImages := []string{
s.ExpandValue(s.Config.Kubernetes.Image),
s.ExpandValue(s.helperImageInfo.Name),
}
var (
optionName string
allowedImages []string
)
if opts.isServiceContainer {
optionName = "services"
allowedImages = s.Config.Kubernetes.AllowedServices
} else if opts.name == buildContainerName {
optionName = "images"
allowedImages = s.Config.Kubernetes.AllowedImages
}
verifyAllowedImageOptions := common.VerifyAllowedImageOptions{
Image: opts.image,
OptionName: optionName,
AllowedImages: allowedImages,
InternalImages: internalImages,
}
return common.VerifyAllowedImage(verifyAllowedImageOptions, s.BuildLogger)
}
func (s *executor) shouldUseStartupProbe() bool {
honorEntrypoint := s.Build.IsFeatureFlagOn(featureflags.KubernetesHonorEntrypoint)
legacyExecMode := s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy)
// For attach mode, the system waits until the entrypoint ran and the shell is spawned anyway, regardless if there is
// something running as part of the entrypoint or not. For legacy exec mode, we don't wait for the entrypoint (we run
// a separate process as opposed to attaching to the "main" process), thus we need a signal when the entrypoint is
// done and the shell is spawned. However, when in exec mode and ignoring the image's entrypoint (thus only starting
// the shell), we don't really have to wait, either.
// Thus, in summary: We only need a startupProbe if we run in exec mode *and* honor the image's entrypoint.
return honorEntrypoint && legacyExecMode
}
func (s *executor) getCommandAndArgs(imageDefinition common.Image, command ...string) (retCommand []string, retArgs []string) {
if s.Build.IsFeatureFlagOn(featureflags.KubernetesHonorEntrypoint) {
return []string{}, command
}
if len(command) == 0 && len(imageDefinition.Entrypoint) > 0 {
command = imageDefinition.Entrypoint
}
var args []string
if len(imageDefinition.Command) > 0 {
args = imageDefinition.Command
}
return command, args
}
func (s *executor) logFile() string {
return path.Join(s.logsDir(), "output.log")
}
func (s *executor) logsDir() string {
return s.baseDir(defaultLogsBaseDir,
s.Config.Kubernetes.LogsBaseDir, s.Build.JobInfo.ProjectID, s.Build.JobResponse.ID)
}
func (s *executor) scriptsDir() string {
return s.baseDir(defaultScriptsBaseDir,
s.Config.Kubernetes.ScriptsBaseDir, s.Build.JobInfo.ProjectID, s.Build.JobResponse.ID)
}
func (s *executor) baseDir(defaultBaseDir, configDir string, projectId, jobId int64) string {
baseDir := defaultBaseDir
if configDir != "" {
// if path ends with one or more / or \, drop it
configDir = strings.TrimRight(configDir, "/\\")
baseDir = configDir + defaultBaseDir
}
return fmt.Sprintf("%s-%d-%d", baseDir, projectId, jobId)
}
func (s *executor) scriptPath(stage common.BuildStage) string {
return path.Join(s.scriptsDir(), s.scriptName(string(stage)))
}
func (s *executor) scriptName(name string) string {
shell := s.Shell()
conf, err := common.GetShell(shell.Shell).GetConfiguration(*shell)
if err != nil || conf.Extension == "" {
return name
}
return name + "." + conf.Extension
}
func (s *executor) getVolumeMounts() []api.VolumeMount {
var mounts []api.VolumeMount
// scripts volumes are needed when using the Kubernetes executor in attach mode
// FF_USE_LEGACY_KUBERNETES_EXECUTION_STRATEGY = false
// or when the dumb init is used as it is copied from the helper to this volume
if s.Build.IsFeatureFlagOn(featureflags.UseDumbInitWithKubernetesExecutor) ||
!s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
mounts = append(
mounts,
api.VolumeMount{
Name: "scripts",
MountPath: s.scriptsDir(),
})
}
if !s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
// These volume mounts **MUST NOT** be mounted inside another volume mount.
// E.g. mounting them inside the "repo" volume mount will cause the whole volume
// to be owned by root instead of the current user of the image. Something similar
// is explained here https://github.com/kubernetes/kubernetes/issues/2630#issuecomment-64679120
// where the first container determines the ownership of a volume. However, it seems like
// when mounting a volume inside another volume the first container or the first point of contact
// becomes root, regardless of SecurityContext or Image settings changing the user ID of the container.
// This causes builds to stop working in environments such as OpenShift where there's no root access
// resulting in an inability to modify anything inside the parent volume.
mounts = append(
mounts,
api.VolumeMount{
Name: "logs",
MountPath: s.logsDir(),
})
}
mounts = append(mounts, s.getVolumeMountsForConfig()...)
if s.isDefaultBuildsDirVolumeRequired() {
mounts = append(mounts, api.VolumeMount{
Name: "repo",
MountPath: s.AbstractExecutor.RootDir(),
})
}
return mounts
}
func (s *executor) getVolumeMountsForConfig() []api.VolumeMount {
var mounts []api.VolumeMount
for _, mount := range s.Config.Kubernetes.Volumes.HostPaths {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: s.Build.GetAllVariables().ExpandValue(mount.MountPath),
SubPath: s.Build.GetAllVariables().ExpandValue(mount.SubPath),
ReadOnly: mount.ReadOnly,
MountPropagation: (*api.MountPropagationMode)(mount.MountPropagation),
})
}
for _, mount := range s.Config.Kubernetes.Volumes.Secrets {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: s.Build.GetAllVariables().ExpandValue(mount.MountPath),
SubPath: s.Build.GetAllVariables().ExpandValue(mount.SubPath),
ReadOnly: mount.ReadOnly,
})
}
for _, mount := range s.Config.Kubernetes.Volumes.PVCs {
mounts = append(mounts, api.VolumeMount{
Name: s.Build.GetAllVariables().ExpandValue(mount.Name),
MountPath: s.Build.GetAllVariables().ExpandValue(mount.MountPath),
SubPath: s.Build.GetAllVariables().ExpandValue(mount.SubPath),
ReadOnly: mount.ReadOnly,
MountPropagation: (*api.MountPropagationMode)(mount.MountPropagation),
})
}
for _, mount := range s.Config.Kubernetes.Volumes.ConfigMaps {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: s.Build.GetAllVariables().ExpandValue(mount.MountPath),
SubPath: s.Build.GetAllVariables().ExpandValue(mount.SubPath),
ReadOnly: mount.ReadOnly,
})
}
for _, mount := range s.Config.Kubernetes.Volumes.EmptyDirs {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: s.Build.GetAllVariables().ExpandValue(mount.MountPath),
SubPath: s.Build.GetAllVariables().ExpandValue(mount.SubPath),
})
}
for _, mount := range s.Config.Kubernetes.Volumes.CSIs {
mounts = append(mounts, api.VolumeMount{
Name: mount.Name,
MountPath: s.Build.GetAllVariables().ExpandValue(mount.MountPath),
SubPath: s.Build.GetAllVariables().ExpandValue(mount.SubPath),
ReadOnly: mount.ReadOnly,
})
}
return mounts
}
func (s *executor) getVolumes() []api.Volume {
volumes := s.getVolumesForConfig()
if s.isDefaultBuildsDirVolumeRequired() {
volumes = append(volumes, api.Volume{
Name: "repo",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
})
}
// scripts volumes are needed when using the Kubernetes executor in attach mode
// FF_USE_LEGACY_KUBERNETES_EXECUTION_STRATEGY = false
// or when the dumb init is used as it is copied from the helper to this volume
if s.Build.IsFeatureFlagOn(featureflags.UseDumbInitWithKubernetesExecutor) ||
!s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
volumes = append(volumes, api.Volume{
Name: "scripts",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
})
}
if s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
return volumes
}
volumes = append(
volumes,
api.Volume{
Name: "logs",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
})
return volumes
}
func (s *executor) getVolumesForConfig() []api.Volume {
var volumes []api.Volume
volumes = append(volumes, s.getVolumesForHostPaths()...)
volumes = append(volumes, s.getVolumesForSecrets()...)
volumes = append(volumes, s.getVolumesForPVCs()...)
volumes = append(volumes, s.getVolumesForConfigMaps()...)
volumes = append(volumes, s.getVolumesForEmptyDirs()...)
volumes = append(volumes, s.getVolumesForCSIs()...)
return volumes
}
func (s *executor) getVolumesForHostPaths() []api.Volume {
var volumes []api.Volume
for _, volume := range s.Config.Kubernetes.Volumes.HostPaths {
path := volume.HostPath
// Make backward compatible with syntax introduced in version 9.3.0
if path == "" {
path = volume.MountPath
}
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: path,
},
},
})
}
return volumes
}
func (s *executor) getVolumesForSecrets() []api.Volume {
var volumes []api.Volume
for _, volume := range s.Config.Kubernetes.Volumes.Secrets {
var items []api.KeyToPath
for key, path := range volume.Items {
items = append(items, api.KeyToPath{Key: key, Path: path})
}
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
Secret: &api.SecretVolumeSource{
SecretName: volume.Name,
Items: items,
},
},
})
}
return volumes
}
func (s *executor) getVolumesForPVCs() []api.Volume {
var volumes []api.Volume
store := make(map[string]api.Volume)
for _, volume := range s.Config.Kubernetes.Volumes.PVCs {
if _, found := store[volume.Name]; found {
continue
}
// Resolve the runtime name by injecting variable references.
resolvedName := s.Build.GetAllVariables().ExpandValue(volume.Name)
apiVolume := api.Volume{
Name: resolvedName,
VolumeSource: api.VolumeSource{
PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{
ClaimName: resolvedName,
},
},
}
volumes = append(volumes, apiVolume)
store[volume.Name] = apiVolume
}
return volumes
}
func (s *executor) getVolumesForConfigMaps() []api.Volume {
var volumes []api.Volume
for _, volume := range s.Config.Kubernetes.Volumes.ConfigMaps {
var items []api.KeyToPath
for key, path := range volume.Items {
items = append(items, api.KeyToPath{Key: key, Path: path})
}
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
ConfigMap: &api.ConfigMapVolumeSource{
LocalObjectReference: api.LocalObjectReference{
Name: volume.Name,
},
Items: items,
},
},
})
}
return volumes
}
func (s *executor) getVolumesForEmptyDirs() []api.Volume {
var volumes []api.Volume
for _, volume := range s.Config.Kubernetes.Volumes.EmptyDirs {
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{
Medium: api.StorageMedium(volume.Medium),
SizeLimit: s.parseVolumeSizeLimit(volume),
},
},
})
}
return volumes
}
func (s *executor) parseVolumeSizeLimit(volume common.KubernetesEmptyDir) *resource.Quantity {
if strings.Trim(volume.SizeLimit, " ") == "" {
return nil
}
quantity, err := resource.ParseQuantity(volume.SizeLimit)
if err != nil {
s.BuildLogger.Warningln(fmt.Sprintf("invalid limit quantity %q for empty volume %q: %v", volume.SizeLimit, volume.Name, err))
return nil
}
return &quantity
}
func (s *executor) getVolumesForCSIs() []api.Volume {
var volumes []api.Volume
for _, volume := range s.Config.Kubernetes.Volumes.CSIs {
volumes = append(volumes, api.Volume{
Name: volume.Name,
VolumeSource: api.VolumeSource{
CSI: &api.CSIVolumeSource{
Driver: volume.Driver,
FSType: &volume.FSType,
ReadOnly: &volume.ReadOnly,
VolumeAttributes: volume.VolumeAttributes,
},
},
})
}
return volumes
}
func (s *executor) isDefaultBuildsDirVolumeRequired() bool {
if s.requireDefaultBuildsDirVolume != nil {
return *s.requireDefaultBuildsDirVolume
}
required := true
for _, mount := range s.getVolumeMountsForConfig() {
if mount.MountPath == s.AbstractExecutor.RootDir() {
required = false
break
}
}
s.requireDefaultBuildsDirVolume = &required
return required
}
func (s *executor) isSharedBuildsDirRequired() bool {
// Return quickly when default builds dir is used as job is
// isolated to pod, so no need for SharedBuildsDir behavior
if s.isDefaultBuildsDirVolumeRequired() {
return false
}
required := true
if s.requireSharedBuildsDir != nil {
return *s.requireSharedBuildsDir
}
// Fetch name of the volume backing the builds volume mount
buildVolumeName := "repo"
for _, mount := range s.getVolumeMountsForConfig() {
if mount.MountPath == s.AbstractExecutor.RootDir() {
buildVolumeName = mount.Name
break
}
}
// Require shared builds dir when builds dir volume is anything except an emptyDir
for _, volume := range s.getVolumes() {
if volume.Name == buildVolumeName && volume.VolumeSource.EmptyDir != nil {
required = false
break
}
}
s.requireSharedBuildsDir = &required
return required
}
func (s *executor) setupCredentials(ctx context.Context) error {
s.BuildLogger.Debugln("Setting up secrets")
authConfigs, err := auth.ResolveConfigs(s.Build.GetDockerAuthConfig(), s.Shell().User, s.Build.Credentials, &s.BuildLogger)
if err != nil {
return err
}
if len(authConfigs) == 0 {
return nil
}
dockerCfgs := make(map[string]types.AuthConfig)
for registry, registryInfo := range authConfigs {
dockerCfgs[registry] = registryInfo.AuthConfig
}
dockerCfgContent, err := json.Marshal(dockerCfgs)
if err != nil {
return err
}
secret := api.Secret{}
secret.Name = generateNameForK8sResources(s.Build.ProjectUniqueName())
secret.Namespace = s.configurationOverwrites.namespace
secret.Type = api.SecretTypeDockercfg
secret.Data = map[string][]byte{}
secret.Data[api.DockerConfigKey] = dockerCfgContent
s.credentials, err = retry.WithValueFn(s, func() (*api.Secret, error) {
return s.requestSecretCreation(ctx, &secret, s.configurationOverwrites.namespace)
}).Run()
return err
}
func (s *executor) requestSecretCreation(
ctx context.Context,
secret *api.Secret,
namespace string,
) (*api.Secret, error) {
// kubeAPI: secrets, create
creds, err := s.kubeClient.CoreV1().
Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
if isConflict(err) {
s.BuildLogger.Debugln(
fmt.Sprintf(
"Conflict while trying to create the secret %s ... Retrieving the existing resource",
secret.Name,
),
)
// kubeAPI: secrets, get
creds, err = s.kubeClient.CoreV1().
Secrets(namespace).Get(ctx, secret.Name, metav1.GetOptions{})
}
return creds, err
}
func (s *executor) getHostAliases() ([]api.HostAlias, error) {
supportsHostAliases, err := s.featureChecker.IsHostAliasSupported()
switch {
case errors.Is(err, &badVersionError{}):
s.BuildLogger.Warningln("Checking for host alias support. Host aliases will be disabled.", err)
return nil, nil
case err != nil:
return nil, err
case !supportsHostAliases:
return nil, nil
}
return createHostAliases(s.options.servicesList(), s.Config.Kubernetes.GetHostAliases())
}
func (s *executor) setupBuildNamespace(ctx context.Context) error {
if !s.Config.Kubernetes.NamespacePerJob {
return nil
}
s.BuildLogger.Debugln("Setting up build namespace")
nsconfig := api.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: s.configurationOverwrites.namespace,
},
}
//nolint:gocritic
// kubeAPI: namespaces, create, kubernetes.NamespacePerJob=true
_, err := s.kubeClient.CoreV1().Namespaces().Create(ctx, &nsconfig, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create namespace: %w", err)
}
return err
}
func (s *executor) teardownBuildNamespace(ctx context.Context) error {
if !s.Config.Kubernetes.NamespacePerJob {
return nil
}
s.BuildLogger.Debugln("Tearing down build namespace")
//nolint:gocritic
// kubeAPI: namespaces, delete, kubernetes.NamespacePerJob=true
err := s.kubeClient.CoreV1().Namespaces().Delete(ctx, s.configurationOverwrites.namespace, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete namespace: %w", err)
}
return nil
}
func (s *executor) setupBuildPod(ctx context.Context, initContainers []api.Container) error {
s.BuildLogger.Debugln("Setting up build pod")
prepareOpts, err := s.createPodConfigPrepareOpts(initContainers)
if err != nil {
return err
}
podConfig, err := s.preparePodConfig(prepareOpts)
if err != nil {
return err
}
s.BuildLogger.Debugln("Checking for ImagePullSecrets or ServiceAccount existence")
err = s.checkDependantResources(ctx)
if err != nil {
return err
}
if s.Build.IsFeatureFlagOn(featureflags.UseAdvancedPodSpecConfiguration) {
s.BuildLogger.Warningln("Advanced Pod Spec configuration enabled, merging the provided PodSpec to the generated one. " +
"This is a beta feature and is subject to change. Feedback is collected in this issue: " +
"https://gitlab.com/gitlab-org/gitlab-runner/-/issues/29659 ...")
podConfig.Spec, err = s.applyPodSpecMerge(&podConfig.Spec)
if err != nil {
return err
}
}
// if we need to retry on pull issues, we need to set the new pod name, so that we don't track terminating pods.
s.podWatcher.UpdatePodName(podConfig.GetName())
s.BuildLogger.Debugln("Creating build pod")
s.pod, err = retry.WithValueFn(s, func() (*api.Pod, error) {
return s.requestPodCreation(ctx, &podConfig, s.configurationOverwrites.namespace)
}).Run()
if err != nil {
return err
}
ownerReferences := s.buildPodReferences()
err = s.setOwnerReferencesForResources(ctx, ownerReferences)
if err != nil {
return fmt.Errorf("error setting ownerReferences: %w", err)
}
s.services, err = s.makePodProxyServices(ctx, ownerReferences)
return err
}
func (s *executor) requestPodCreation(ctx context.Context, pod *api.Pod, namespace string) (*api.Pod, error) {
// kubeAPI: pods, create
p, err := s.kubeClient.CoreV1().
Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
if isConflict(err) {
s.BuildLogger.Debugln(
fmt.Sprintf(
"Conflict while trying to create the pod %s ... Retrieving the existing resource",
pod.Name,
),
)
// kubeAPI: pods, get
p, err = s.kubeClient.CoreV1().
Pods(namespace).Get(ctx, pod.Name, metav1.GetOptions{})
}
return p, err
}
func (s *executor) checkDependantResources(ctx context.Context) error {
if s.Config.Kubernetes.GetResourceAvailabilityCheckMaxAttempts() == 0 {
s.BuildLogger.Debugln("Resources check has been disabled")
return nil
}
err := s.waitForResource(
ctx,
resourceTypeServiceAccount,
s.Config.Kubernetes.ServiceAccount,
s.serviceAccountExists(),
)
if err != nil {
return err
}
for _, secretName := range s.Config.Kubernetes.ImagePullSecrets {
err = s.waitForResource(
ctx,
resourceTypePullSecret,
secretName,
s.secretExists(),
)
if err != nil {
return err
}
}
return nil
}
func (s *executor) buildLabels() map[string]string {
// We set default pod labels. These are not allowed to be overwritten.
labels := map[string]string{
// Retained for backwards compatibility, may be removed in future release!
"pod": sanitizeLabel(s.Build.ProjectUniqueName()),
"project." + runnerLabelNamespace + "/id": strconv.FormatInt(s.Build.JobInfo.ProjectID, 10),
"project." + runnerLabelNamespace + "/namespace-id": sanitizeLabel(s.Build.Variables.Value("CI_PROJECT_NAMESPACE_ID")),
"project." + runnerLabelNamespace + "/name": sanitizeLabel(s.Build.JobInfo.ProjectName),
"project." + runnerLabelNamespace + "/namespace": sanitizeLabel(s.Build.Variables.Value("CI_PROJECT_NAMESPACE")),
"project." + runnerLabelNamespace + "/root-namespace": sanitizeLabel(s.Build.Variables.Value("CI_PROJECT_ROOT_NAMESPACE")),
// Used for setting up services for the build pod
"job." + runnerLabelNamespace + "/pod": sanitizeLabel(s.Build.ProjectUniqueName()),
"manager." + runnerLabelNamespace + "/name": sanitizeLabel(s.Config.Name),
"manager." + runnerLabelNamespace + "/id-short": sanitizeLabel(s.Config.ShortDescription()),
}
safeLabelSetter := func(key, val string) {
if runnerLabelNamespacePattern.MatchString(key) {
s.BuildLogger.Debugln(fmt.Sprintf("not setting pod label %q, overwrite of labels in the %q namespace is not allowed", key, runnerLabelNamespace))
return
}
labels[key] = sanitizeLabel(s.Build.Variables.ExpandValue(val))
}
for key, val := range s.Build.Runner.Kubernetes.PodLabels {
safeLabelSetter(key, val)
}
for key, val := range s.configurationOverwrites.podLabels {
safeLabelSetter(key, val)
}
return labels
}
func (s *executor) createPodConfigPrepareOpts(initContainers []api.Container) (podConfigPrepareOpts, error) {
podServices, err := s.preparePodServices()
if err != nil {
return podConfigPrepareOpts{}, err
}
labels := s.buildLabels()
annotations := map[string]string{
"job." + runnerLabelNamespace + "/id": strconv.FormatInt(s.Build.ID, 10),
"job." + runnerLabelNamespace + "/url": s.Build.JobURL(),
"job." + runnerLabelNamespace + "/sha": s.Build.GitInfo.Sha,
"job." + runnerLabelNamespace + "/before_sha": s.Build.GitInfo.BeforeSha,
"job." + runnerLabelNamespace + "/ref": s.Build.GitInfo.Ref,
"job." + runnerLabelNamespace + "/name": s.Build.JobInfo.Name,
"job." + runnerLabelNamespace + "/timeout": s.Build.GetBuildTimeout().String(),
"project." + runnerLabelNamespace + "/id": strconv.FormatInt(s.Build.JobInfo.ProjectID, 10),
}
for key, val := range s.configurationOverwrites.podAnnotations {
annotations[key] = s.Build.Variables.ExpandValue(val)
}
imagePullSecrets := s.prepareImagePullSecrets()
hostAliases, err := s.getHostAliases()
if err != nil {
return podConfigPrepareOpts{}, err
}
return podConfigPrepareOpts{
labels: labels,
annotations: annotations,
services: podServices,
imagePullSecrets: imagePullSecrets,
hostAliases: hostAliases,
initContainers: initContainers,
}, nil
}
func (s *executor) isWindowsJob() bool {
return s.helperImageInfo.OSType == helperimage.OSTypeWindows
}
func (s *executor) defaultCapDrop() []string {
if s.isWindowsJob() {
return nil
}
return []string{
// Reasons for disabling NET_RAW by default were
// discussed in https://gitlab.com/gitlab-org/gitlab-runner/-/issues/26833
"NET_RAW",
}
}
func (s *executor) prepareImagePullSecrets() []api.LocalObjectReference {
if s.Config.Kubernetes.UseServiceAccountImagePullSecrets {
return nil
}
var imagePullSecrets []api.LocalObjectReference
for _, imagePullSecret := range s.Config.Kubernetes.ImagePullSecrets {
imagePullSecrets = append(imagePullSecrets, api.LocalObjectReference{Name: imagePullSecret})
}
if s.credentials != nil {
imagePullSecrets = append(imagePullSecrets, api.LocalObjectReference{Name: s.credentials.Name})
}
return imagePullSecrets
}
func (s *executor) preparePodServices() ([]api.Container, error) {
var err error
podServices := make([]api.Container, len(s.options.Services))
for i, name := range s.options.getSortedServiceNames() {
service := s.options.Services[name]
podServices[i], err = s.buildContainer(containerBuildOpts{
name: name,
image: service.Name,
imageDefinition: *service,
isServiceContainer: true,
requests: s.configurationOverwrites.getServiceResourceRequests(name),
limits: s.configurationOverwrites.getServiceResourceLimits(name),
securityContext: s.setSecurityContextUser(
service.ExecutorOptions.Kubernetes,
s.Config.Kubernetes.GetContainerSecurityContext(
s.Config.Kubernetes.ServiceContainerSecurityContext,
s.defaultCapDrop()...,
),
),
})
if err != nil {
return nil, err
}
}
return podServices, nil
}
func (s *executor) preparePodConfig(opts podConfigPrepareOpts) (api.Pod, error) {
buildContainer, helperContainer, err := s.createBuildAndHelperContainers()
if err != nil {
return api.Pod{}, err
}
pod := api.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: generateNameForK8sResources(s.Build.ProjectUniqueName()),
Namespace: s.configurationOverwrites.namespace,
Labels: opts.labels,
Annotations: opts.annotations,
},
Spec: api.PodSpec{
Volumes: s.getVolumes(),
SchedulerName: s.Config.Kubernetes.SchedulerName,
ServiceAccountName: s.configurationOverwrites.serviceAccount,
AutomountServiceAccountToken: s.Config.Kubernetes.AutomountServiceAccountToken,
RestartPolicy: api.RestartPolicyNever,
NodeSelector: s.configurationOverwrites.nodeSelector,
Tolerations: s.getPodTolerations(),
InitContainers: opts.initContainers,
Containers: append([]api.Container{
buildContainer,
helperContainer,
}, opts.services...),
TerminationGracePeriodSeconds: s.Config.Kubernetes.PodTerminationGracePeriodSeconds,
ActiveDeadlineSeconds: s.getPodActiveDeadlineSeconds(),
ImagePullSecrets: opts.imagePullSecrets,
SecurityContext: s.Config.Kubernetes.GetPodSecurityContext(),
HostAliases: opts.hostAliases,
Affinity: s.Config.Kubernetes.GetAffinity(),
DNSPolicy: s.getDNSPolicy(),
DNSConfig: s.Config.Kubernetes.GetDNSConfig(),
RuntimeClassName: s.Config.Kubernetes.RuntimeClassName,
PriorityClassName: s.Config.Kubernetes.PriorityClassName,
},
}
return pod, nil
}
// getPodTolerations returns a list of pod tolerations converted from map
// entries in the kubernetes configuration, and possibly from map entries
// generated from job variables, if overwrite is allowed.
func (s *executor) getPodTolerations() []api.Toleration {
var tolerations []api.Toleration
for keyvalue, effect := range s.configurationOverwrites.nodeTolerations {
newToleration := api.Toleration{
Key: keyvalue,
Operator: api.TolerationOpExists,
Effect: api.TaintEffect(effect),
}
if strings.Contains(keyvalue, "=") {
parts := strings.SplitN(keyvalue, "=", 2)
newToleration.Key = parts[0]
if len(parts) > 1 {
newToleration.Value = parts[1]
}
newToleration.Operator = api.TolerationOpEqual
}
tolerations = append(tolerations, newToleration)
}
return tolerations
}
// getPodActiveDeadlineSeconds returns the effective build/job timeout
// The feature is behind a FF and return a nil pointer when
// FF_POD_ACTIVE_DEADLINE_SECONDS is disabled
// https://gitlab.com/gitlab-org/gitlab-runner/-/issues/29279.
func (s *executor) getPodActiveDeadlineSeconds() *int64 {
if !s.Build.IsFeatureFlagOn(featureflags.UsePodActiveDeadlineSeconds) {
return nil
}
s.BuildLogger.Println(fmt.Sprintf(
"Using FF_USE_POD_ACTIVE_DEADLINE_SECONDS, the Pod activeDeadlineSeconds will be set to the job timeout: %v...",
time.Duration(s.Build.RunnerInfo.Timeout)*time.Second,
))
// We do not set the exact timeout as activeDeadlineSeconds
// 1 second is added to allow the job to timeout on GitLab side
// before the pod can be marked as failed and the container killed
timeout := int64(s.Build.RunnerInfo.Timeout + 1)
return &timeout
}
func (s *executor) createBuildAndHelperContainers() (api.Container, api.Container, error) {
buildCmd, err := s.getContainerCommand(buildContainerName)
if err != nil {
return api.Container{}, api.Container{}, err
}
buildContainer, err := s.buildContainer(containerBuildOpts{
name: buildContainerName,
image: s.options.Image.Name,
imageDefinition: s.options.Image,
requests: s.configurationOverwrites.buildRequests,
limits: s.configurationOverwrites.buildLimits,
securityContext: s.setSecurityContextUser(
s.options.Image.ExecutorOptions.Kubernetes,
s.Config.Kubernetes.GetContainerSecurityContext(
s.Config.Kubernetes.BuildContainerSecurityContext,
s.defaultCapDrop()...,
),
),
command: buildCmd,
})
if err != nil {
return api.Container{}, api.Container{}, fmt.Errorf("building build container: %w", err)
}
helperCmd, err := s.getContainerCommand(helperContainerName)
if err != nil {
return api.Container{}, api.Container{}, err
}
helperContainer, err := s.buildContainer(containerBuildOpts{
name: helperContainerName,
image: s.getHelperImage(),
requests: s.configurationOverwrites.helperRequests,
limits: s.configurationOverwrites.helperLimits,
securityContext: s.Config.Kubernetes.GetContainerSecurityContext(
s.Config.Kubernetes.HelperContainerSecurityContext,
s.defaultCapDrop()...,
),
command: helperCmd,
})
if err != nil {
return api.Container{}, api.Container{}, fmt.Errorf("building helper container: %w", err)
}
if s.shouldUseStartupProbe() {
buildContainer.StartupProbe = s.buildContainerStartupProbe()
}
return buildContainer, helperContainer, nil
}
func (s *executor) setSecurityContextUser(opts common.ImageKubernetesOptions, context *api.SecurityContext) *api.SecurityContext {
uid, gid, err := opts.GetUIDGID()
if err != nil {
s.BuildLogger.Warningln(
fmt.Sprintf(
"Error parsing 'uid' or 'gid' from image options, using the configured security context: %v",
err,
),
)
return context
}
if uid > 0 {
context.RunAsUser = &uid
}
if gid > 0 {
context.RunAsGroup = &gid
}
return context
}
func (s *executor) buildContainerStartupProbe() *api.Probe {
notUpLog := "gitlab-runner shell not up yet"
startupProbeFile := s.getStartupProbeFile()
var probeCommand []string
switch shell := s.Shell().Shell; shell {
case shells.SNPwsh, shells.SNPowershell:
probeCommand = []string{
shell, "-CommandWithArgs", "if (-Not (Test-Path $args[0] -PathType Leaf)) { $args[1] ; exit 1 }", startupProbeFile, notUpLog,
}
default:
probeCommand = []string{
shell, "-c", `test -e "$1" || { echo -n "$2"; exit 1; }`, "--", startupProbeFile, notUpLog,
}
}
pollInterval := s.Config.Kubernetes.GetPollInterval()
pollAttempts := s.Config.Kubernetes.GetPollAttempts()
return &api.Probe{
ProbeHandler: api.ProbeHandler{
Exec: &api.ExecAction{Command: probeCommand},
},
SuccessThreshold: 1,
InitialDelaySeconds: 1,
PeriodSeconds: int32(pollInterval),
FailureThreshold: int32(pollAttempts),
}
}
func (s *executor) getStartupProbeFile() string {
return filepath.Join(s.RootDir(), shells.StartupProbeFile)
}
func (s *executor) getContainerCommand(containerName string) ([]string, error) {
command := s.BuildShell.DockerCommand
if s.shouldUseStartupProbe() && containerName == buildContainerName {
shell, err := s.retrieveShell()
if err != nil {
return nil, fmt.Errorf("retrieving shell: %w", err)
}
return shell.GetEntrypointCommand(*s.Shell(), s.getStartupProbeFile()), nil
}
if !s.Build.IsFeatureFlagOn(featureflags.UseDumbInitWithKubernetesExecutor) {
return command, nil
}
switch s.Shell().Shell {
case shells.SNPowershell:
return command, nil
default:
return append([]string{fmt.Sprintf("%s/dumb-init", s.scriptsDir()), "--"}, command...), nil
}
}
// Inspired by
// https://github.com/kubernetes/kubernetes/blob/cde45fb161c5a4bfa7cfe45dfd814f6cc95433f7/cmd/kubeadm/app/util/patches/patches.go#L171
func (s *executor) applyPodSpecMerge(podSpec *api.PodSpec) (api.PodSpec, error) {
patchedData, err := json.Marshal(podSpec)
if err != nil {
return api.PodSpec{}, err
}
for _, spec := range s.Config.Kubernetes.PodSpec {
patchedData, err = doPodSpecMerge(patchedData, spec)
if err != nil {
return api.PodSpec{}, err
}
}
var patchedPodSpec api.PodSpec
err = json.Unmarshal(patchedData, &patchedPodSpec)
return patchedPodSpec, err
}
func doPodSpecMerge(original []byte, spec common.KubernetesPodSpec) ([]byte, error) {
var data []byte
patchBytes, patchType, err := spec.PodSpecPatch()
if err != nil {
return nil, err
}
switch patchType {
case common.PatchTypeJSONPatchType:
var patchObj jsonpatch.Patch
patchObj, err = jsonpatch.DecodePatch(patchBytes)
if err == nil {
data, err = patchObj.Apply(original)
}
if err != nil {
return nil, err
}
case common.PatchTypeMergePatchType:
data, err = jsonpatch.MergePatch(original, patchBytes)
if err != nil {
return nil, err
}
case common.PatchTypeStrategicMergePatchType:
data, err = strategicpatch.StrategicMergePatch(
original,
patchBytes,
api.PodSpec{},
)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported patch type %v", patchType)
}
return data, nil
}
func (s *executor) setOwnerReferencesForResources(ctx context.Context, ownerReferences []metav1.OwnerReference) error {
if s.credentials == nil {
return nil
}
var err error
s.credentials, err = retry.WithValueFn(s, func() (*api.Secret, error) {
credentials := s.credentials.DeepCopy()
credentials.SetOwnerReferences(ownerReferences)
// kubeAPI: secrets, update
return s.kubeClient.CoreV1().
Secrets(s.configurationOverwrites.namespace).
Update(ctx, credentials, metav1.UpdateOptions{})
}).Run()
return err
}
func (s *executor) buildPodReferences() []metav1.OwnerReference {
return []metav1.OwnerReference{
{
APIVersion: apiVersion,
Kind: ownerReferenceKind,
Name: s.pod.GetName(),
UID: s.pod.GetUID(),
},
}
}
func (s *executor) waitForResource(
ctx context.Context,
resourceType string,
resourceName string,
checkExists func(context.Context, string) bool,
) error {
attempt := -1
s.BuildLogger.Debugln(fmt.Sprintf("Checking for %s existence", resourceType))
for attempt < s.Config.Kubernetes.GetResourceAvailabilityCheckMaxAttempts() {
if checkExists(ctx, resourceName) {
return nil
}
attempt++
if attempt > 0 {
s.BuildLogger.Debugln(fmt.Sprintf(
"Pausing check of the %s availability for %d (attempt %d)",
resourceType,
resourceAvailabilityCheckMaxPollInterval,
attempt,
))
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(resourceAvailabilityCheckMaxPollInterval):
}
}
return &resourceCheckError{
resourceType: resourceType,
resourceName: resourceName,
}
}
func (s *executor) serviceAccountExists() func(context.Context, string) bool {
return func(ctx context.Context, saName string) bool {
if saName == "" {
return true
}
return retry.WithFn(s, func() error {
// NOTE: Casing is important here
// kubeAPI: serviceaccounts, get
_, err := s.kubeClient.CoreV1().
ServiceAccounts(s.configurationOverwrites.namespace).Get(ctx, saName, metav1.GetOptions{})
return err
}).Run() == nil
}
}
func (s *executor) secretExists() func(context.Context, string) bool {
return func(ctx context.Context, secretName string) bool {
return retry.WithFn(s, func() error {
// kubeAPI: secrets, get
_, err := s.kubeClient.CoreV1().
Secrets(s.configurationOverwrites.namespace).Get(ctx, secretName, metav1.GetOptions{})
return err
}).Run() == nil
}
}
func (s *executor) getDNSPolicy() api.DNSPolicy {
dnsPolicy, err := s.Config.Kubernetes.DNSPolicy.Get()
if err != nil {
s.BuildLogger.Warningln(fmt.Sprintf("falling back to cluster's default policy: %v", err))
}
return dnsPolicy
}
func (s *executor) getHelperImage() string {
if s.Config.Kubernetes.HelperImage != "" {
return s.ExpandValue(s.Config.Kubernetes.HelperImage)
}
return s.helperImageInfo.String()
}
func (s *executor) makePodProxyServices(
ctx context.Context,
ownerReferences []metav1.OwnerReference,
) ([]api.Service, error) {
s.BuildLogger.Debugln("Creating pod proxy services")
ch := make(chan serviceCreateResponse)
var wg sync.WaitGroup
wg.Add(len(s.ProxyPool))
for serviceName, serviceProxy := range s.ProxyPool {
serviceName = dns.MakeRFC1123Compatible(serviceName)
servicePorts := make([]api.ServicePort, len(serviceProxy.Settings.Ports))
for i, port := range serviceProxy.Settings.Ports {
// When there is more than one port Kubernetes requires a port name
portName := fmt.Sprintf("%s-%d", serviceName, port.Number)
servicePorts[i] = api.ServicePort{
Port: int32(port.Number),
TargetPort: intstr.FromInt(port.Number),
Name: portName,
}
}
serviceConfig := s.prepareServiceConfig(serviceName, servicePorts, ownerReferences)
go s.createKubernetesService(ctx, &serviceConfig, serviceProxy.Settings, ch, &wg)
}
go func() {
wg.Wait()
close(ch)
}()
var proxyServices []api.Service
for res := range ch {
if res.err != nil {
err := fmt.Errorf("error creating the proxy service %q: %w", res.service.Name, res.err)
s.BuildLogger.Errorln(err)
return []api.Service{}, err
}
proxyServices = append(proxyServices, *res.service)
}
return proxyServices, nil
}
func (s *executor) prepareServiceConfig(
name string,
ports []api.ServicePort,
ownerReferences []metav1.OwnerReference,
) api.Service {
return api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: generateNameForK8sResources(name),
Namespace: s.configurationOverwrites.namespace,
OwnerReferences: ownerReferences,
},
Spec: api.ServiceSpec{
Ports: ports,
Selector: map[string]string{"job." + runnerLabelNamespace + "/pod": sanitizeLabel(s.Build.ProjectUniqueName())},
Type: api.ServiceTypeClusterIP,
},
}
}
func (s *executor) createKubernetesService(
ctx context.Context,
service *api.Service,
proxySettings *proxy.Settings,
ch chan<- serviceCreateResponse,
wg *sync.WaitGroup,
) {
defer wg.Done()
var err error
service, err = retry.WithValueFn(s, func() (*api.Service, error) {
return s.requestServiceCreation(ctx, service, s.pod.Namespace)
}).Run()
if err == nil {
// Updating the internal service name reference and activating the proxy
proxySettings.ServiceName = service.Name
}
ch <- serviceCreateResponse{service: service, err: err}
}
func (s *executor) requestServiceCreation(
ctx context.Context,
service *api.Service,
namespace string,
) (*api.Service, error) {
// kubeAPI: services, create
srv, err := s.kubeClient.CoreV1().
Services(namespace).Create(ctx, service, metav1.CreateOptions{})
if isConflict(err) {
s.BuildLogger.Debugln(
fmt.Sprintf(
"Conflict while trying to create the service %s ... Retrieving the existing resource",
service.Name,
),
)
// kubeAPI: services, get
srv, err = s.kubeClient.CoreV1().
Services(namespace).Get(ctx, service.Name, metav1.GetOptions{})
}
return srv, err
}
func (s *executor) watchPodStatus(ctx context.Context, extendedStatusFunc podStatusChecker) <-chan error {
// Buffer of 1 in case the context is cancelled while the timer tick case is being executed
// and the consumer is no longer reading from the channel while we try to write to it
ch := make(chan error, 1)
go func() {
defer close(ch)
t := time.NewTicker(time.Duration(s.Config.Kubernetes.GetPollInterval()) * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
err := s.checkPodStatus(ctx, extendedStatusFunc)
if err != nil {
ch <- err
return
}
}
}
}()
return ch
}
// Interface to check if a job pod is unhealthy
type podStatusChecker interface {
// Checks if a job pod is unhealthy
check(context.Context, *api.Pod) error
}
// Checks if a pod is unhealthy based on the statuses of its containers
type podContainerStatusChecker struct {
// Filter to determine which containers to check
shouldCheckContainerFilter func(api.ContainerStatus) bool
}
func (c *podContainerStatusChecker) check(ctx context.Context, pod *api.Pod) error {
for _, containerStatus := range pod.Status.ContainerStatuses {
if c.shouldCheckContainerFilter != nil && !c.shouldCheckContainerFilter(containerStatus) {
continue
}
if containerStatus.State.Terminated != nil &&
containerStatus.State.Terminated.ExitCode >= 0 {
return &podContainerError{
containerName: containerStatus.Name,
exitCode: int(containerStatus.State.Terminated.ExitCode),
reason: containerStatus.State.Terminated.Reason,
}
}
}
return nil
}
func isNotServiceContainer(containerStatus api.ContainerStatus) bool {
return containerStatus.Name == buildContainerName || containerStatus.Name == helperContainerName
}
func (s *executor) checkPodStatus(ctx context.Context, extendedStatusCheck podStatusChecker) error {
pod, err := retry.WithValueFn(s, func() (*api.Pod, error) {
// kubeAPI: pods, get
return s.kubeClient.CoreV1().
Pods(s.pod.Namespace).Get(ctx, s.pod.Name, metav1.GetOptions{})
}).Run()
if IsKubernetesPodNotFoundError(err) {
return err
}
if err != nil {
// General request failure
s.BuildLogger.Warningln("Getting job pod status", err)
return nil
}
if pod.Status.Phase != api.PodRunning {
return &podPhaseError{
name: s.pod.Name,
phase: pod.Status.Phase,
}
}
return extendedStatusCheck.check(ctx, pod)
}
func (s *executor) runInContainer(
ctx context.Context,
stage common.BuildStage,
name string,
command []string,
) <-chan error {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
attach := AttachOptions{
PodName: s.pod.Name,
Namespace: s.pod.Namespace,
ContainerName: name,
Command: command,
Config: s.kubeConfig,
KubeClient: s.kubeClient,
Executor: &DefaultRemoteExecutor{},
Context: ctx,
}
kubeRequest := retry.WithFn(s, func() error {
err := attach.Run()
s.BuildLogger.Debugln(fmt.Sprintf("Trying to execute stage %v, got error %v", stage, err))
return s.checkScriptExecution(stage, err)
})
if err := kubeRequest.Run(); err != nil {
errCh <- err
}
exitStatus := <-s.remoteProcessTerminated
s.BuildLogger.Debugln("Remote process exited with the status:", exitStatus)
// CommandExitCode is guaranteed to be non nil when sent over the remoteProcessTerminated channel
if *exitStatus.CommandExitCode == 0 {
errCh <- nil
return
}
errCh <- &commandTerminatedError{exitCode: *exitStatus.CommandExitCode}
}()
return errCh
}
func (s *executor) runInContainerWithExec(
ctx context.Context,
name string,
command []string,
script string,
stdout, stderr io.WriteCloser,
) <-chan error {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
exec := ExecOptions{
PodName: s.pod.Name,
Namespace: s.pod.Namespace,
ContainerName: name,
Command: command,
In: strings.NewReader(script),
Out: stdout,
Err: stderr,
Stdin: true,
Config: s.kubeConfig,
KubeClient: s.kubeClient,
Executor: &DefaultRemoteExecutor{},
Context: ctx,
}
errCh <- retry.WithFn(s, exec.Run).Run()
}()
return errCh
}
func (s *executor) checkScriptExecution(stage common.BuildStage, err error) error {
// Retrying attach command is a bit different from regular Kubernetes requests.
// Since the attach commands are executed by openning an HTTP stream to the Kubernetes server
// and piping the command into that stream and then expecting a response there's no good place to check
// whether the whole command execution was successful.
// If we check whether the Stdin stream was read - the connection might have broken up after during transit of that
// meaning that the command was never executed.
// It could have also been broken during the reading of the response stream - meaning that it was executed, but we can't know that.
// The only solution is to check for certain whether the process is already running.
// For attach that is easy since the process is completely running in the background, and we receive the status of it through
// the log file and the log processor moves things forward.
// Non-network errors don't concern this function
if slices.ContainsFunc(retryNetworkErrorsGroup, func(v string) bool {
return err != nil && strings.Contains(err.Error(), v)
}) {
return err
}
s.remoteStageStatusMutex.Lock()
defer s.remoteStageStatusMutex.Unlock()
s.BuildLogger.Debugln(fmt.Sprintf("Checking remote stage status after trying attach with err %v. Remote stage status: %v", err, s.remoteStageStatus))
// If the remote stage is the one we are trying to retry it means that it was already executed.
s.BuildLogger.Debugln(fmt.Sprintf("Remote stage: %v, trying to execute stage %v", s.remoteStageStatus.BuildStage(), stage))
if s.remoteStageStatus.BuildStage() == stage {
return nil
}
// If the remote stage is not the same, then we can retry
return err
}
func (s *executor) prepareOverwrites(variables common.JobVariables) error {
values, err := createOverwrites(s.Config.Kubernetes, variables, s.BuildLogger)
if err != nil {
return err
}
s.configurationOverwrites = values
return nil
}
func (s *executor) prepareServiceOverwrites(services map[string]*common.Image) error {
for name, service := range services {
if err := s.configurationOverwrites.evaluateExplicitServiceResourceOverwrite(
s.Config.Kubernetes,
name,
service.Variables,
s.BuildLogger,
); err != nil {
return err
}
}
return nil
}
func (s *executor) prepareOptions(build *common.Build) {
index := 0
usedAliases := make(map[string]struct{})
s.options = &kubernetesOptions{
Image: build.Image,
Services: make(map[string]*common.Image),
}
for _, svc := range s.Config.Kubernetes.GetExpandedServices(s.Build.GetAllVariables()) {
if svc.Name == "" {
continue
}
serviceName, service := "", svc.ToImageDefinition()
index, serviceName = s.getServiceDefinition(&service, usedAliases, index)
s.options.Services[serviceName] = &service
}
for _, service := range build.Services {
if service.Name == "" {
continue
}
serviceName := ""
index, serviceName = s.getServiceDefinition(&service, usedAliases, index)
s.options.Services[serviceName] = &service
}
}
func (s *executor) getServiceDefinition(
service *common.Image,
usedAliases map[string]struct{},
serviceIndex int,
) (int, string) {
name := getServiceName(service, usedAliases)
if name == "" {
name = fmt.Sprintf("%s%d", serviceContainerPrefix, serviceIndex)
serviceIndex++
}
return serviceIndex, name
}
func getServiceName(svc *common.Image, usedAliases map[string]struct{}) string {
for _, alias := range svc.Aliases() {
if _, ok := usedAliases[alias]; ok {
continue
}
if len(validation.IsDNS1123Label(alias)) != 0 {
usedAliases[alias] = struct{}{}
continue
}
usedAliases[alias] = struct{}{}
return alias
}
return ""
}
func (s *executor) prepareLifecycleHooks() *api.Lifecycle {
lifecycleCfg := s.Config.Kubernetes.GetContainerLifecycle()
if lifecycleCfg.PostStart == nil && lifecycleCfg.PreStop == nil {
return nil
}
lifecycle := &api.Lifecycle{}
if lifecycleCfg.PostStart != nil {
lifecycle.PostStart = lifecycleCfg.PostStart.ToKubernetesLifecycleHandler()
}
if lifecycleCfg.PreStop != nil {
lifecycle.PreStop = lifecycleCfg.PreStop.ToKubernetesLifecycleHandler()
}
return lifecycle
}
func (s *executor) getServiceVariables(serviceDefinition common.Image) common.JobVariables {
variables := s.Build.GetAllVariables().PublicOrInternal()
variables = append(variables, serviceDefinition.Variables...)
return variables.Expand()
}
// checkDefaults Defines the configuration for the Pod on Kubernetes
func (s *executor) checkDefaults() error {
if s.options.Image.Name == "" {
k8sConfigImageName := s.ExpandValue(s.Config.Kubernetes.Image)
if k8sConfigImageName == "" {
return fmt.Errorf("no image specified and no default set in config")
}
s.options.Image.Name = k8sConfigImageName
}
if s.Config.Kubernetes.NamespacePerJob {
s.configurationOverwrites.namespace = fmt.Sprintf("ci-job-%d", s.Build.ID)
}
if s.configurationOverwrites.namespace == "" {
s.BuildLogger.Warningln(
fmt.Sprintf("Namespace is empty, therefore assuming '%s'.", DefaultResourceIdentifier),
)
s.configurationOverwrites.namespace = DefaultResourceIdentifier
}
s.BuildLogger.Println("Using Kubernetes namespace:", s.configurationOverwrites.namespace)
return nil
}
// captureServiceContainersLogs initiates capturing logs for the services containers to a desired additional sink. The
// sink can be any io.Writer. Currently the sink is the jobs main trace, which is wrapped in an inlineServiceLogWriter
// instance to add additional context to logs. In the future this could be separate file.
func (s *executor) captureServiceContainersLogs(ctx context.Context, containers []api.Container) {
if !s.Build.IsCIDebugServiceEnabled() {
return
}
for _, name := range s.options.getSortedServiceNames() {
service := s.options.Services[name]
for _, container := range containers {
if service.Name != container.Image {
continue
}
logger := s.BuildLogger.Stream(buildlogger.StreamStartingServiceLevel, buildlogger.Stdout)
defer logger.Close()
aliases := append([]string{strings.Split(container.Image, ":")[0]}, service.Aliases()...)
sink := service_helpers.NewInlineServiceLogWriter(strings.Join(aliases, "-"), logger)
if err := s.captureContainerLogs(ctx, container.Name, sink); err != nil {
s.BuildLogger.Warningln(err.Error())
}
logger.Close()
}
}
}
// captureContainerLogs tails (i.e. reads) logs emitted to stdout or stderr from
// processes in the specified kubernetes managed container, and redirects them
// to the specified sink, which can be any io.Writer (e.g. this process's
// stdout, a file, a log aggregator). The logs are streamed as they are emitted,
// rather than batched and written when we disconnect from the container (or it
// is stopped). The specified sink is closed when the source is completely
// drained.
func (s *executor) captureContainerLogs(ctx context.Context, containerName string, sink io.WriteCloser) error {
podLogOpts := api.PodLogOptions{
Container: containerName,
Follow: true,
Timestamps: true,
}
podLogs, err := retry.WithValueFn(s, func() (io.ReadCloser, error) {
err := waitForRunningContainer(ctx, s.kubeClient, s.Config.Kubernetes.GetPollTimeout(), s.pod.Namespace, s.pod.Name, containerName)
if err != nil {
return nil, err
}
// kubeAPI: pods/log, get, list, FF_KUBERNETES_HONOR_ENTRYPOINT=true,FF_USE_LEGACY_KUBERNETES_EXECUTION_STRATEGY=false
return s.kubeClient.CoreV1().Pods(s.pod.Namespace).GetLogs(s.pod.Name, &podLogOpts).Stream(ctx)
}).Run()
if err != nil {
return fmt.Errorf("failed to open log stream for container %s: %w", containerName, err)
}
s.BuildLogger.Debugln("streaming logs for container " + containerName)
go func() {
defer podLogs.Close()
defer sink.Close()
if _, err = io.Copy(sink, podLogs); err != nil {
if err != io.EOF && !errors.Is(err, context.Canceled) {
s.BuildLogger.Warningln(fmt.Sprintf(
"error streaming logs for container %s: %s",
containerName,
err.Error(),
))
}
}
s.BuildLogger.Debugln("stopped streaming logs for container " + containerName)
}()
return nil
}
func generateNameForK8sResources(pattern string) string {
suffix := make([]rune, k8sResourcesNameSuffixLength)
for i := range suffix {
suffix[i] = chars[rand.Intn(len(chars))]
}
if len(pattern) > (k8sResourcesNameMaxLength - k8sResourcesNameSuffixLength - 1) {
pattern = pattern[:k8sResourcesNameMaxLength-k8sResourcesNameSuffixLength-1]
}
return fmt.Sprintf("%s-%s", pattern, string(suffix))
}
// When calling the k8s API request, it can happen that despite the failure of the request,
// the resource was actually created. When it comes to POST method, the following retries will get
// a 409 status code (conflits because of the name that must be unique)
// When such status code is received, we stop the retries
func isConflict(err error) bool {
var statusError *kubeerrors.StatusError
return errors.As(err, &statusError) &&
statusError.ErrStatus.Code == http.StatusConflict &&
strings.Contains(statusError.ErrStatus.Message, errorAlreadyExistsMessage)
}
func IsKubernetesPodNotFoundError(err error) bool {
var statusErr *kubeerrors.StatusError
return errors.As(err, &statusErr) &&
statusErr.ErrStatus.Code == http.StatusNotFound &&
statusErr.ErrStatus.Details != nil &&
statusErr.ErrStatus.Details.Kind == "pods"
}
func IsKubernetesPodFailedError(err error) bool {
var podPhaseErr *podPhaseError
return errors.As(err, &podPhaseErr) &&
podPhaseErr.phase == api.PodFailed
}
func IsKubernetesPodContainerError(err error) bool {
var podServiceError *podContainerError
return errors.As(err, &podServiceError)
}
// Use 'gitlab-runner check-health' to wait until any/all configured services are healthy.
func (s *executor) waitForServices(ctx context.Context) error {
portArgs := ""
for _, name := range s.options.getSortedServiceNames() {
service := s.options.Services[name]
port := service.Variables.Get("HEALTHCHECK_TCP_PORT")
if port == "" {
continue
}
portArgs += fmt.Sprintf("--port '%s' ", port)
}
if portArgs == "" {
return nil
}
command := "gitlab-runner-helper health-check " + portArgs
var err error
if s.Build.IsFeatureFlagOn(featureflags.UseLegacyKubernetesExecutionStrategy) {
err = s.setupPodLegacy(ctx)
} else {
err = s.ensurePodsConfigured(ctx)
}
if err != nil {
return err
}
podStatusCh := s.watchPodStatus(ctx, &podContainerStatusChecker{})
stdout, stderr := s.getExecutorIoWriters()
defer stdout.Close()
defer stderr.Close()
select {
case err := <-s.runInContainerWithExec(ctx, helperContainerName, s.BuildShell.DockerCommand, command, stdout, stderr):
s.BuildLogger.Debugln(fmt.Sprintf("Container helper exited with error: %v", err))
var exitError exec.CodeExitError
if err != nil && errors.As(err, &exitError) {
return &common.BuildError{Inner: err, ExitCode: exitError.ExitStatus()}
}
case err := <-podStatusCh:
s.BuildLogger.Println("Health check aborted due to error: ", err.Error())
return err
case <-ctx.Done():
return fmt.Errorf("health check aborted")
}
return nil
}
func (s *executor) getExecutorIoWriters() (io.WriteCloser, io.WriteCloser) {
return s.BuildLogger.Stream(buildlogger.StreamWorkLevel, buildlogger.Stdout),
s.BuildLogger.Stream(buildlogger.StreamWorkLevel, buildlogger.Stderr)
}
func newExecutor() *executor {
e := &executor{
AbstractExecutor: executors.AbstractExecutor{
ExecutorOptions: executorOptions,
Config: common.RunnerConfig{
RunnerSettings: common.RunnerSettings{
Kubernetes: &common.KubernetesConfig{},
},
},
},
remoteProcessTerminated: make(chan shells.StageCommandStatus),
newKubeClient: func(config *restclient.Config) (kubernetes.Interface, error) {
return kubernetes.NewForConfig(config)
},
getKubeConfig: getKubeClientConfig,
windowsKernelVersion: os_helpers.LocalKernelVersion,
}
type resourceCheckResult struct {
allowed bool
reason string
}
e.newPodWatcher = func(c podWatcherConfig) podWatcher {
gvr := metav1.GroupVersionResource{Version: "v1", Resource: "pods"}
docLink := "https://docs.gitlab.com/runner/executors/kubernetes/#informers"
for _, verb := range []string{"list", "watch"} {
res, err := retry.WithValueFn(c.retryProvider, func() (resourceCheckResult, error) {
allowed, reason, err := c.featureChecker.IsResourceVerbAllowed(c.ctx, gvr, c.namespace, verb)
return resourceCheckResult{allowed, reason}, err
}).Run()
if res.allowed && err == nil {
continue
}
reason := res.reason
if err != nil {
reason = err.Error()
}
c.logger.Warningln(fmt.Sprintf("won't use informers: %q, see: %s", reason, docLink))
return watchers.NoopPodWatcher{}
}
return watchers.NewPodWatcher(c.ctx, c.logger, c.kubeClient, c.namespace, c.labels, c.maxSyncDuration)
}
e.newLogProcessor = func() logProcessor {
return newKubernetesLogProcessor(
e.kubeClient,
e.kubeConfig,
&backoff.Backoff{Min: time.Second, Max: 30 * time.Second},
e.Build.Log(),
kubernetesLogProcessorPodConfig{
namespace: e.pod.Namespace,
pod: e.pod.Name,
container: helperContainerName,
logPath: e.logFile(),
waitLogFileTimeout: waitLogFileTimeout,
},
)
}
return e
}
func featuresFn(features *common.FeaturesInfo) {
features.Variables = true
features.Image = true
features.Services = true
features.Artifacts = true
features.Cache = true
features.FallbackCacheKeys = true
features.Session = true
features.Terminal = true
features.Proxy = true
features.ServiceVariables = true
features.ServiceMultipleAliases = true
}
func init() {
common.RegisterExecutorProvider(common.ExecutorKubernetes, executors.DefaultExecutorProvider{
Creator: func() common.Executor {
return newExecutor()
},
FeaturesUpdater: featuresFn,
DefaultShellName: executorOptions.Shell.Shell,
})
}