executors/kubernetes/util.go (176 lines of code) (raw):
package kubernetes
import (
"errors"
"fmt"
"io"
"net/http"
"time"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
clientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
)
func init() {
clientcmd.DefaultCluster = clientcmdapi.Cluster{}
}
func getKubeClientConfig(config *common.KubernetesConfig) (*restclient.Config, error) {
switch {
case len(config.CertFile) > 0:
if len(config.KeyFile) == 0 || len(config.CAFile) == 0 {
return nil, fmt.Errorf("ca file, cert file and key file must be specified when using file based auth")
}
return &restclient.Config{
Host: config.Host,
TLSClientConfig: restclient.TLSClientConfig{
CertFile: config.CertFile,
KeyFile: config.KeyFile,
CAFile: config.CAFile,
},
}, nil
case len(config.Host) > 0:
return &restclient.Config{
Host: config.Host,
}, nil
default:
// Try in cluster config first
if inClusterCfg, err := restclient.InClusterConfig(); err == nil {
return inClusterCfg, nil
}
config, err := clientcmd.NewDefaultClientConfigLoadingRules().Load()
if err != nil {
return nil, err
}
clientConfig := clientcmd.NewDefaultClientConfig(*config, &clientcmd.ConfigOverrides{})
return clientConfig.ClientConfig()
}
}
func getKubeClient(config *common.KubernetesConfig) (*client.Client, error) {
restConfig, err := getKubeClientConfig(config)
if err != nil {
return nil, err
}
return client.New(restConfig)
}
func closeKubeClient(client *client.Client) bool {
if client == nil || client.Client == nil || client.Client.Transport == nil {
return false
}
if transport, _ := client.Client.Transport.(*http.Transport); transport != nil {
transport.CloseIdleConnections()
return true
}
return false
}
func isRunning(pod *api.Pod) (bool, error) {
switch pod.Status.Phase {
case api.PodRunning:
return true, nil
case api.PodSucceeded:
return false, fmt.Errorf("pod already succeeded before it begins running")
case api.PodFailed:
return false, fmt.Errorf("pod status is failed")
default:
return false, nil
}
}
type podPhaseResponse struct {
done bool
phase api.PodPhase
err error
}
func getPodPhase(c *client.Client, pod *api.Pod, out io.Writer) podPhaseResponse {
pod, err := c.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
return podPhaseResponse{true, api.PodUnknown, err}
}
ready, err := isRunning(pod)
if err != nil {
return podPhaseResponse{true, pod.Status.Phase, err}
}
if ready {
return podPhaseResponse{true, pod.Status.Phase, nil}
}
// check status of containers
for _, container := range pod.Status.ContainerStatuses {
if container.Ready {
continue
}
if container.State.Waiting == nil {
continue
}
switch container.State.Waiting.Reason {
case "ErrImagePull", "ImagePullBackOff":
err = fmt.Errorf("image pull failed: %s", container.State.Waiting.Message)
err = &common.BuildError{Inner: err}
return podPhaseResponse{true, api.PodUnknown, err}
}
}
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
return podPhaseResponse{false, pod.Status.Phase, nil}
}
func triggerPodPhaseCheck(c *client.Client, pod *api.Pod, out io.Writer) <-chan podPhaseResponse {
errc := make(chan podPhaseResponse)
go func() {
defer close(errc)
errc <- getPodPhase(c, pod, out)
}()
return errc
}
// waitForPodRunning will use client c to detect when pod reaches the PodRunning
// state. It returns the final PodPhase once either PodRunning, PodSucceeded or
// PodFailed has been reached. In the case of PodRunning, it will also wait until
// all containers within the pod are also Ready.
// It returns error if the call to retrieve pod details fails or the timeout is
// reached.
// The timeout and polling values are configurable through KubernetesConfig
// parameters.
func waitForPodRunning(ctx context.Context, c *client.Client, pod *api.Pod, out io.Writer, config *common.KubernetesConfig) (api.PodPhase, error) {
pollInterval := config.GetPollInterval()
pollAttempts := config.GetPollAttempts()
for i := 0; i <= pollAttempts; i++ {
select {
case r := <-triggerPodPhaseCheck(c, pod, out):
if !r.done {
time.Sleep(time.Duration(pollInterval) * time.Second)
continue
}
return r.phase, r.err
case <-ctx.Done():
return api.PodUnknown, ctx.Err()
}
}
return api.PodUnknown, errors.New("timedout waiting for pod to start")
}
// limits takes a string representing CPU & memory limits,
// and returns a ResourceList with appropriately scaled Quantity
// values for Kubernetes. This allows users to write "500m" for CPU,
// and "50Mi" for memory (etc.)
func limits(cpu, memory string) (api.ResourceList, error) {
var rCPU, rMem resource.Quantity
var err error
parse := func(s string) (resource.Quantity, error) {
var q resource.Quantity
if len(s) == 0 {
return q, nil
}
if q, err = resource.ParseQuantity(s); err != nil {
return q, fmt.Errorf("error parsing resource limit: %s", err.Error())
}
return q, nil
}
if rCPU, err = parse(cpu); err != nil {
return api.ResourceList{}, nil
}
if rMem, err = parse(memory); err != nil {
return api.ResourceList{}, nil
}
l := make(api.ResourceList)
q := resource.Quantity{}
if rCPU != q {
l[api.ResourceCPU] = rCPU
}
if rMem != q {
l[api.ResourceMemory] = rMem
}
return l, nil
}
// buildVariables converts a common.BuildVariables into a list of
// kubernetes EnvVar objects
func buildVariables(bv common.JobVariables) []api.EnvVar {
e := make([]api.EnvVar, len(bv))
for i, b := range bv {
e[i] = api.EnvVar{
Name: b.Key,
Value: b.Value,
}
}
return e
}