pkg/client/kubeclient.go (199 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"fmt"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
type SchedulerKubeClient struct {
clientSet *kubernetes.Clientset
configs *rest.Config
}
func newBootstrapSchedulerKubeClient(kc string) SchedulerKubeClient {
config := CreateRestConfigOrDie(kc)
configuredClient, err := kubernetes.NewForConfig(config)
if err != nil {
log.Log(log.ShimClient).Fatal("failed to get Clientset", zap.Error(err))
}
return SchedulerKubeClient{
clientSet: configuredClient,
configs: config,
}
}
func newSchedulerKubeClient(kc string) SchedulerKubeClient {
schedulerConf := conf.GetSchedulerConf()
config := CreateRestConfigOrDie(kc)
config.QPS = float32(schedulerConf.KubeQPS)
config.Burst = schedulerConf.KubeBurst
configuredClient, err := kubernetes.NewForConfig(config)
if err != nil {
log.Log(log.ShimClient).Fatal("failed to get Clientset", zap.Error(err))
}
return SchedulerKubeClient{
clientSet: configuredClient,
configs: config,
}
}
func CreateRestConfigOrDie(kc string) *rest.Config {
config, err := CreateRestConfig(kc)
if err != nil {
log.Log(log.ShimClient).Fatal("unable to create REST config, aborting", zap.Error(err))
}
return config
}
func CreateRestConfig(kc string) (*rest.Config, error) {
// attempt to use in-cluster config
config, err := rest.InClusterConfig()
if err != nil && err != rest.ErrNotInCluster {
log.Log(log.ShimClient).Error("failed to create REST config", zap.Error(err))
return nil, err
}
if config != nil {
return config, nil
}
// fall back to kubeconfig if present
if kc == "" {
kc = conf.GetDefaultKubeConfigPath()
}
log.Log(log.ShimClient).Info(fmt.Sprintf("Not running inside Kubernetes; using KUBECONFIG at %s", kc))
config, err = clientcmd.BuildConfigFromFlags("", kc)
if err != nil {
log.Log(log.ShimClient).Error("failed to create kubeClient configs", zap.Error(err))
return config, err
}
return config, nil
}
func (nc SchedulerKubeClient) GetClientSet() kubernetes.Interface {
return nc.clientSet
}
func (nc SchedulerKubeClient) GetConfigs() *rest.Config {
return nc.configs
}
func (nc SchedulerKubeClient) Bind(pod *v1.Pod, hostID string) error {
log.Log(log.ShimClient).Info("bind pod to node",
zap.String("podName", pod.Name),
zap.String("podUID", string(pod.UID)),
zap.String("nodeID", hostID))
if err := nc.clientSet.CoreV1().Pods(pod.Namespace).Bind(
context.Background(),
&v1.Binding{ObjectMeta: apis.ObjectMeta{
Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: hostID,
},
},
apis.CreateOptions{}); err != nil {
log.Log(log.ShimClient).Error("failed to bind pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.Error(err))
return err
}
return nil
}
func (nc SchedulerKubeClient) Create(pod *v1.Pod) (*v1.Pod, error) {
return nc.clientSet.CoreV1().Pods(pod.Namespace).Create(context.Background(), pod, apis.CreateOptions{})
}
func (nc SchedulerKubeClient) Delete(pod *v1.Pod) error {
if err := nc.clientSet.CoreV1().Pods(pod.Namespace).Delete(context.Background(), pod.Name, apis.DeleteOptions{}); err != nil {
log.Log(log.ShimClient).Warn("failed to delete pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.Error(err))
return err
}
return nil
}
func (nc SchedulerKubeClient) GetConfigMap(namespace string, name string) (*v1.ConfigMap, error) {
configmap, err := nc.clientSet.CoreV1().ConfigMaps(namespace).Get(context.Background(), name, apis.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
log.Log(log.ShimClient).Warn("failed to get configmap",
zap.String("namespace", namespace),
zap.String("name", name),
zap.Error(err))
return nil, err
}
return configmap, nil
}
func (nc SchedulerKubeClient) Get(podNamespace string, podName string) (*v1.Pod, error) {
pod, err := nc.clientSet.CoreV1().Pods(podNamespace).Get(context.Background(), podName, apis.GetOptions{})
if err != nil {
log.Log(log.ShimClient).Warn("failed to get pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.Error(err))
return nil, err
}
return pod, nil
}
func (nc SchedulerKubeClient) UpdatePod(pod *v1.Pod, podMutator func(pod *v1.Pod)) (*v1.Pod, error) {
var updatedPod *v1.Pod
var updateErr error
// Retrieve the latest version of Pod before attempting update
// RetryOnConflict uses exponential backoff to avoid exhausting the API server
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
latestPod, getErr := nc.clientSet.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, apis.GetOptions{})
if getErr != nil {
log.Log(log.ShimClient).Warn("failed to get latest version of Pod",
zap.Error(getErr))
}
// allow mutator to update pod
podMutator(latestPod)
if updatedPod, updateErr = nc.clientSet.CoreV1().Pods(pod.Namespace).Update(context.Background(), latestPod, apis.UpdateOptions{}); updateErr != nil {
log.Log(log.ShimClient).Warn("failed to update pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.Error(updateErr))
return updateErr
}
return nil
})
if retryErr != nil {
log.Log(log.ShimClient).Error("Update pod failed",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.Error(retryErr))
return pod, retryErr
}
log.Log(log.ShimClient).Info("Successfully updated pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name))
return updatedPod, nil
}
func (nc SchedulerKubeClient) UpdateStatus(pod *v1.Pod) (*v1.Pod, error) {
var updatedPod *v1.Pod
var updateErr error
newPodStatus := pod.Status
// In case of conflicts, retry using the logic in
// https://github.com/kubernetes/client-go/blob/v0.21.1/examples/create-update-delete-deployment/main.go#L118-L121
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Pod before attempting status update
// RetryOnConflict uses exponential backoff to avoid exhausting the API server
latestPod, getErr := nc.clientSet.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, apis.GetOptions{})
if getErr != nil {
log.Log(log.ShimClient).Warn("failed to get latest version of Pod",
zap.Error(getErr))
}
latestPod.Status = newPodStatus
if updatedPod, updateErr = nc.clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(context.Background(), latestPod, apis.UpdateOptions{}); updateErr != nil {
log.Log(log.ShimClient).Warn("failed to update pod status",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.Error(updateErr))
return updateErr
}
return nil
})
if retryErr != nil {
log.Log(log.ShimClient).Error("Update pod status failed",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.Error(retryErr))
return pod, retryErr
}
log.Log(log.ShimClient).Info("Successfully updated pod status",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.Stringer("newStatus", &pod.Status))
return updatedPod, nil
}