executors/kubernetes/internal/watchers/pod.go (156 lines of code) (raw):
package watchers
import (
"context"
"errors"
"fmt"
"slices"
"sync/atomic"
"time"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/executors/kubernetes/internal/pull"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
// emitErrorTimeout is the time we wait for a consumer of the error channel to receive a message. We must not block the
// informer, and if there is nobody listening for an error, there is no need to block on that anyway. We still give
// consumers some time, it was observed that _sometimes_ it takes a bit for the channel receiver to actually be able to
// receive (looking at you, windows).
const emitErrorTimeout = time.Millisecond
type logger interface {
Debugln(args ...any)
}
// PodWatcher uses an informer to get pod updates and determines if a pod has terminal errors
type PodWatcher struct {
factory *selfManagedInformerFactory
logger logger
podName atomic.Value
// Don't send on this channel in a blocking manner, so that we don't block the informer. You can use the emitError()
// method.
errors chan error
}
// NewPodWatcher creates a pod watcher based on the kubeclient, namespace, and labels, and with a maximum duration for
// allowed for the initial cache sync.
// Internally, it creates a informer factory which can manage itself, so that it can be used and shut down properly.
func NewPodWatcher(ctx context.Context, logger logger, kubeClient kubernetes.Interface, namespace string, labels map[string]string, maxSyncDuration time.Duration) *PodWatcher {
return &PodWatcher{
factory: newScopedInformerFactory(ctx, kubeClient, namespace, labels, maxSyncDuration),
logger: logger,
errors: make(chan error),
}
}
// Start starts the watcher, by creating an informer via the informer factory, starting that, waiting for events to
// come in, and forwarding (terminal) pod errors to the subscriber.
func (p *PodWatcher) Start() error {
gvr := v1.SchemeGroupVersion.WithResource("pods")
//nolint:gocritic
// kubeAPI: pods, list, watch, using Informers=https://docs.gitlab.com/runner/executors/kubernetes/#informers
informer, err := p.factory.ForResource(gvr)
if err != nil {
return fmt.Errorf("creating informer for pods: %w", err)
}
_, err = informer.Informer().AddEventHandler(p.resourceHandler())
if err != nil {
return fmt.Errorf("registering event handler: %w", err)
}
// kubeAPI: ignore
p.factory.Start()
// kubeAPI: ignore
for informer, isSynced := range p.factory.WaitForCacheSync() {
if isSynced {
continue
}
err = errors.Join(err, fmt.Errorf("not synced: %s", informer))
}
return err
}
// UpdatePodName sets the pod name we are interested in
func (p *PodWatcher) UpdatePodName(podName string) {
p.podName.Store(podName)
}
// Stop shuts down the pod watcher by shutting down its dependants: the informer factory and thus the
// informers created based on it.
func (p *PodWatcher) Stop() {
// kubeAPI: ignore
p.factory.Shutdown()
}
// Errors reports observed errors on the pod in question. This method MUST only ever called by one consumer at a time.
func (p *PodWatcher) Errors() <-chan error {
return p.errors
}
func (p *PodWatcher) onPodChange(pod *v1.Pod) {
podErr := checkTerminalPodErrors(pod)
if podErr == nil {
return
}
p.emitError(podErr)
}
// emitError sends out an error in a non-blocking way, so that the informer is not blocked.
func (p *PodWatcher) emitError(err error) {
select {
case p.errors <- err:
// nothing to do, we've sent out the pod error
case <-time.After(emitErrorTimeout):
p.logger.Debugln(fmt.Sprintf("pod error not consumed in time (%s): %s", emitErrorTimeout, err))
}
}
func (p *PodWatcher) resourceHandler() cache.ResourceEventHandler {
return cache.FilteringResourceEventHandler{
FilterFunc: func(obj any) bool {
// We need to filter on the pod name; when the executor retries on pull issues, it starts the machinery from
// fresh. While this is happening, the old pod might still be terminating. We don't care about these old pods
// anymore in this context, and thus don't want to receive updates thereof.
pod := asPod(obj)
if pod == nil {
p.logger.Debugln("update for unsupported object observed", obj)
return false
}
return pod.GetName() == p.currentPodName()
},
Handler: cache.ResourceEventHandlerFuncs{
// In FilterFunc we already checked, that the obj is indeed a non-nil pod, thus we don't have to check in the
// handlers anymore and only have to do the type assertion.
AddFunc: func(obj any) {
p.onPodChange(asPod(obj))
},
UpdateFunc: func(_, newObj any) {
p.onPodChange(asPod(newObj))
},
DeleteFunc: func(obj any) {
pod := asPod(obj)
p.emitError(fmt.Errorf("pod %q is deleted", pod.GetNamespace()+"/"+pod.GetName()))
},
},
}
}
func (p *PodWatcher) currentPodName() string {
if podName, ok := p.podName.Load().(string); ok {
return podName
}
return ""
}
// NoopPodWatcher is an alternative implementation to [PodWatcher] which doesn't do anything.
// It can be used as a stand-in when we don't actually want to run informers, ie. when the feature flag is not
// enabled.
type NoopPodWatcher struct{}
func (NoopPodWatcher) Start() error { return nil }
func (NoopPodWatcher) Stop() {}
func (NoopPodWatcher) Errors() <-chan error { return make(chan error) }
func (NoopPodWatcher) UpdatePodName(string) {}
// asPod is a convenience helper to type-assert an untyped object to a pod.
func asPod(obj any) *v1.Pod {
pod, _ := obj.(*v1.Pod)
return pod
}
// checkTerminalPodErrors checks a pod for errors that are terminal, ie. the system can't recover from.
func checkTerminalPodErrors(pod *v1.Pod) error {
fullPodName := fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
dt := getPodCondition(pod, v1.DisruptionTarget)
if dt != nil && dt.Status == v1.ConditionTrue {
return fmt.Errorf("pod %q is disrupted: reason %q, message %q", fullPodName, dt.Reason, dt.Message)
}
if pod.DeletionTimestamp != nil {
return fmt.Errorf("pod %q is being deleted", fullPodName)
}
// collect all containers' statuses, except those for ephemeral containers
allContainerStatuses := slices.Concat(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses)
if err := CheckTerminalContainerErrors(allContainerStatuses); err != nil {
return fmt.Errorf("pod %q failed: %w", fullPodName, err)
}
return nil
}
// CheckTerminalContainerErrors checks individual container statuses for errors we can't recover from.
func CheckTerminalContainerErrors(containerStatuses []v1.ContainerStatus) error {
for _, containerStatus := range containerStatuses {
if containerStatus.Ready {
continue
}
waiting := containerStatus.State.Waiting
if waiting == nil {
continue
}
switch waiting.Reason {
case "InvalidImageName":
return &common.BuildError{Inner: fmt.Errorf("image pull failed: %s", waiting.Message), FailureReason: common.ConfigurationError}
case "ErrImagePull", "ImagePullBackOff":
msg := fmt.Sprintf("image pull failed: %s", waiting.Message)
imagePullErr := &pull.ImagePullError{Message: msg, Container: containerStatus.Name, Image: containerStatus.Image}
return &common.BuildError{Inner: imagePullErr, FailureReason: common.ImagePullFailure}
}
}
return nil
}
func getPodCondition(pod *v1.Pod, condition v1.PodConditionType) *v1.PodCondition {
conditions := pod.Status.Conditions
for i := range conditions {
if conditions[i].Type == condition {
return &conditions[i]
}
}
return nil
}