executors/kubernetes/util.go (349 lines of code) (raw):
package kubernetes
import (
"errors"
"fmt"
"io"
"net/http"
"slices"
"strings"
"time"
"golang.org/x/net/context"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/executors/kubernetes/internal/watchers"
)
type kubeConfigProvider func() (*restclient.Config, error)
type resourceQuantityError struct {
resource string
value string
inner error
}
func (r *resourceQuantityError) Error() string {
return fmt.Sprintf("parsing resource %q with value %q: %q", r.resource, r.value, r.inner)
}
func (r *resourceQuantityError) Is(err error) bool {
t, ok := err.(*resourceQuantityError)
return ok && r.resource == t.resource && r.value == t.value && r.inner == t.inner
}
var (
// inClusterConfig parses kubernetes configuration reading in cluster values
inClusterConfig kubeConfigProvider = restclient.InClusterConfig
// defaultKubectlConfig parses kubectl configuration ad loads the default cluster
defaultKubectlConfig kubeConfigProvider = loadDefaultKubectlConfig
)
func getKubeClientConfig(
config *common.KubernetesConfig,
overwrites *overwrites,
) (kubeConfig *restclient.Config, err error) {
if config.Host != "" {
kubeConfig, err = getOutClusterClientConfig(config)
} else {
kubeConfig, err = guessClientConfig()
}
if err != nil {
return nil, err
}
// apply overwrites
if overwrites.bearerToken != "" {
kubeConfig.BearerToken = overwrites.bearerToken
}
kubeConfig.UserAgent = common.AppVersion.UserAgent()
return kubeConfig, nil
}
func getOutClusterClientConfig(config *common.KubernetesConfig) (*restclient.Config, error) {
kubeConfig := &restclient.Config{
Host: config.Host,
BearerToken: config.BearerToken,
TLSClientConfig: restclient.TLSClientConfig{
CAFile: config.CAFile,
},
}
// certificate based auth
if config.CertFile != "" {
if config.KeyFile == "" || config.CAFile == "" {
return nil, fmt.Errorf("ca file, cert file and key file must be specified when using file based auth")
}
kubeConfig.TLSClientConfig.CertFile = config.CertFile
kubeConfig.TLSClientConfig.KeyFile = config.KeyFile
}
return kubeConfig, nil
}
func guessClientConfig() (*restclient.Config, error) {
// Try in cluster config first
if inClusterCfg, err := inClusterConfig(); err == nil {
return inClusterCfg, nil
}
// in cluster config failed. Reading default kubectl config
return defaultKubectlConfig()
}
func loadDefaultKubectlConfig() (*restclient.Config, error) {
config, err := clientcmd.NewDefaultClientConfigLoadingRules().Load()
if err != nil {
return nil, err
}
return clientcmd.NewDefaultClientConfig(*config, &clientcmd.ConfigOverrides{}).ClientConfig()
}
func getContainerStatus(containerStatuses []api.ContainerStatus, containerName string) (api.ContainerStatus, bool) {
for i := range containerStatuses {
if containerStatuses[i].Name == containerName {
return containerStatuses[i], true
}
}
return api.ContainerStatus{}, false
}
func waitForRunningContainer(ctx context.Context, client kubernetes.Interface, timeoutSeconds int, namespace, pod, container string) error {
// kubeAPI: pods, watch, FF_KUBERNETES_HONOR_ENTRYPOINT=true,FF_USE_LEGACY_KUBERNETES_EXECUTION_STRATEGY=false
watcher, err := client.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: "status.phase=Running,metadata.name=" + pod,
TimeoutSeconds: common.Int64Ptr(int64(timeoutSeconds)),
})
if err != nil {
return err
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
pod, ok := event.Object.(*api.Pod)
if !ok {
return fmt.Errorf("event object is not a pod: %v", event.Object)
}
containerStatus, ok := getContainerStatus(pod.Status.ContainerStatuses, container)
if !ok {
return fmt.Errorf("container status for %q not found", container)
}
if terminated := containerStatus.State.Terminated; terminated != nil {
if terminated.ExitCode != 0 {
return fmt.Errorf("container %q terminated with non-zero status: %d", container, terminated.ExitCode)
}
return nil
}
if running := containerStatus.State.Running; running != nil {
break
}
}
return nil
}
func closeKubeClient(client kubernetes.Interface) bool {
if client == nil {
return false
}
// kubeAPI: ignore
rest, ok := client.CoreV1().RESTClient().(*restclient.RESTClient)
if !ok || rest.Client == nil || rest.Client.Transport == nil {
return false
}
if transport, ok := rest.Client.Transport.(*http.Transport); ok {
transport.CloseIdleConnections()
return true
}
return false
}
func isRunning(pod *api.Pod, containers ...string) (bool, error) {
switch pod.Status.Phase {
case api.PodRunning:
var readyCount int
for _, c := range containers {
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == c && cs.Ready {
readyCount++
}
}
}
return readyCount == len(containers), nil
case api.PodSucceeded:
return false, fmt.Errorf("pod already succeeded before it begins running")
case api.PodFailed:
return false, fmt.Errorf("pod status is failed")
default:
return false, nil
}
}
type podPhaseResponse struct {
done bool
phase api.PodPhase
err error
}
func getPodPhase(ctx context.Context, client kubernetes.Interface, pod *api.Pod, out io.Writer, containers ...string) (pf podPhaseResponse) {
// kubeAPI: pods, get
pod, err := client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return podPhaseResponse{true, api.PodUnknown, err}
}
ready, err := isRunning(pod, containers...)
if err != nil || ready {
return podPhaseResponse{true, pod.Status.Phase, err}
}
containerStatuses := slices.Concat(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses)
if err := watchers.CheckTerminalContainerErrors(containerStatuses); err != nil {
return podPhaseResponse{true, api.PodUnknown, err}
}
_, _ = fmt.Fprintf(
out,
"Waiting for pod %s/%s to be running, status is %s\n",
pod.Namespace,
pod.Name,
pod.Status.Phase,
)
for _, condition := range pod.Status.Conditions {
// skip conditions with no reason, these are typically expected pod conditions
if condition.Reason == "" {
continue
}
_, _ = fmt.Fprintf(
out,
"\t%s: %q\n",
condition.Reason,
condition.Message,
)
}
return podPhaseResponse{false, pod.Status.Phase, nil}
}
func triggerPodPhaseCheck(ctx context.Context, c kubernetes.Interface, pod *api.Pod, out io.Writer, containers ...string) <-chan podPhaseResponse {
errc := make(chan podPhaseResponse)
go func() {
defer close(errc)
errc <- getPodPhase(ctx, c, pod, out, containers...)
}()
return errc
}
// waitForPodRunning will use client c to detect when pod reaches the PodRunning
// state. It returns the final PodPhase once either PodRunning, PodSucceeded or
// PodFailed has been reached. In the case of PodRunning, it will also wait until
// all containers within the pod are also Ready.
// It returns error if the call to retrieve pod details fails or the timeout is
// reached.
// The timeout and polling values are configurable through KubernetesConfig
// parameters.
// The containers parameter is optional and can be used to wait for a specific containers' readiness
func waitForPodRunning(
ctx context.Context,
c kubernetes.Interface,
pod *api.Pod,
out io.Writer,
config *common.KubernetesConfig,
containers ...string,
) (api.PodPhase, error) {
pollInterval := config.GetPollInterval()
pollAttempts := config.GetPollAttempts()
for i := 0; i <= pollAttempts; i++ {
select {
case r := <-triggerPodPhaseCheck(ctx, c, pod, out, containers...):
if !r.done {
time.Sleep(time.Duration(pollInterval) * time.Second)
continue
}
return r.phase, r.err
case <-ctx.Done():
return api.PodUnknown, ctx.Err()
}
}
return api.PodUnknown, errors.New("timed out waiting for pod to start")
}
func getPodLog(ctx context.Context, client kubernetes.Interface, pod *api.Pod) error {
count := int64(10)
podLogOptions := api.PodLogOptions{
Container: "helper",
Follow: false,
TailLines: &count,
}
//nolint:gocritic
// kubeAPI: pods/log, get, FF_WAIT_FOR_POD_TO_BE_REACHABLE=true
req := client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOptions)
podLogs, err := req.Stream(ctx)
if err != nil {
return fmt.Errorf("failed to open log stream for %s: %v", pod.Name, err)
}
defer podLogs.Close()
return nil
}
func triggerPodReachableCheck(ctx context.Context, c kubernetes.Interface, pod *api.Pod) <-chan error {
errc := make(chan error)
go func() {
defer close(errc)
errc <- getPodLog(ctx, c, pod)
}()
return errc
}
func WaitForPodReachable(
ctx context.Context,
c kubernetes.Interface,
pod *api.Pod,
config *common.KubernetesConfig,
) error {
pollInterval := config.GetPollInterval()
pollAttempts := config.GetPollAttempts()
for i := 0; i <= pollAttempts; i++ {
select {
case r := <-triggerPodReachableCheck(ctx, c, pod):
if r != nil {
time.Sleep(time.Duration(pollInterval) * time.Second)
continue
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
return errors.New("timed out waiting for pod to become attachable")
}
// limits takes a string representing CPU, memory and ephemeralStorage limits,
// and returns a ResourceList with appropriately scaled Quantity
// values for Kubernetes. This allows users to write "500m" for CPU,
// "50Mi" for memory and "1Gi" for ephemeral storage (etc.)
func createResourceList(cpu, memory, ephemeralStorage string) (api.ResourceList, error) {
var rCPU, rMem, rStor resource.Quantity
var err error
parse := func(s string) (resource.Quantity, error) {
var q resource.Quantity
if s == "" {
return q, nil
}
if q, err = resource.ParseQuantity(s); err != nil {
return q, err
}
return q, nil
}
if rCPU, err = parse(cpu); err != nil {
return api.ResourceList{}, &resourceQuantityError{resource: "cpu", value: cpu, inner: err}
}
if rMem, err = parse(memory); err != nil {
return api.ResourceList{}, &resourceQuantityError{resource: "memory", value: memory, inner: err}
}
if rStor, err = parse(ephemeralStorage); err != nil {
return api.ResourceList{}, &resourceQuantityError{
resource: "ephemeralStorage",
value: ephemeralStorage,
inner: err,
}
}
l := make(api.ResourceList)
q := resource.Quantity{}
if rCPU != q {
l[api.ResourceCPU] = rCPU
}
if rMem != q {
l[api.ResourceMemory] = rMem
}
if rStor != q {
l[api.ResourceEphemeralStorage] = rStor
}
return l, nil
}
// buildVariables converts a common.JobVariables into a list of
// kubernetes EnvVar objects
// The order of keys is preserved, but duplicate elements (with the same name/key) will be deduped, the last one in
// the list wins.
func buildVariables(bv common.JobVariables) []api.EnvVar {
idx := map[string]int{}
envs := make([]api.EnvVar, 0, len(bv))
i := 0
for _, b := range bv {
// For file-type secrets, substitute the path to the secret
// for the secret value.
if b.File {
b.Value = bv.Get(b.Key)
}
e := api.EnvVar{
Name: b.Key,
Value: b.Value,
}
if j, ok := idx[e.Name]; ok {
envs[j] = e
continue
}
envs = append(envs, e)
idx[e.Name] = i
i++
}
return slices.Clip(envs)
}
// Sanitize labels to match Kubernetes restrictions from https://kubernetes.io/
// /docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
//
//nolint:gocognit
func sanitizeLabel(value string) string {
mapFn := func(r rune) rune {
if r >= 'a' && r <= 'z' ||
r >= 'A' && r <= 'Z' ||
r >= '0' && r <= '9' ||
r == '-' || r == '_' || r == '.' {
return r
}
return '_'
}
// only alphanumerics, dashes (-), underscores (_), dots (.) are valid
value = strings.Map(mapFn, value)
// must start/end with alphanumerics only
value = strings.Trim(value, "-_.")
// length must be <= 63 characters
if len(value) > 63 {
value = value[:63]
}
// trim again if required after shortening
return strings.Trim(value, "-_.")
}