pkg/k8s/pod/client_wrapper.go (153 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package pod
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
const (
NodeNameSpec = "nodeName"
)
var (
prometheusRegistered = false
annotatePodRequestCallCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "annotate_pod_request_call_count",
Help: "The number of request to annotate pod object",
},
[]string{"annotate_key"},
)
annotatePodRequestErrCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "annotate_pod_request_err_count",
Help: "The number of request that failed to annotate the pod",
},
[]string{"annotate_key"},
)
getPodFromAPIServeCallCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "get_pod_from_api_server_call_count",
Help: "The number of requests to get the pod directly from API Server",
},
)
getPodFromAPIServeErrCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "get_pod_from_api_server_err_count",
Help: "The number of requests that failed to get the pod directly from API Server",
},
)
)
type PodClientAPIWrapper interface {
GetPod(namespace string, name string) (*v1.Pod, error)
ListPods(nodeName string) (*v1.PodList, error)
AnnotatePod(podNamespace string, podName string, uid types.UID, key string, val string) error
GetPodFromAPIServer(ctx context.Context, namespace string, name string) (*v1.Pod, error)
GetRunningPodsOnNode(nodeName string) ([]v1.Pod, error)
}
type podClientAPIWrapper struct {
// All cache operations for Pod must be done using the data store.
// This data store is maintained using the custom controller
dataStore cache.Indexer
// !!WARNING!!
// Don't use this client for performing Read operation on Pods
client client.Client
// coreV1 for performing Read operations directly from API Server
coreV1 corev1.CoreV1Interface
}
func prometheusRegister() {
prometheusRegistered = true
metrics.Registry.MustRegister(
annotatePodRequestCallCount,
annotatePodRequestErrCount,
getPodFromAPIServeCallCount,
getPodFromAPIServeErrCount)
prometheusRegistered = true
}
func NewPodAPIWrapper(dataStore cache.Indexer, client client.Client,
coreV1 corev1.CoreV1Interface) PodClientAPIWrapper {
if !prometheusRegistered {
prometheusRegister()
}
return &podClientAPIWrapper{
dataStore: dataStore,
client: client,
coreV1: coreV1,
}
}
func (p *podClientAPIWrapper) GetRunningPodsOnNode(nodeName string) ([]v1.Pod, error) {
allPodList, err := p.ListPods(nodeName)
if err != nil {
return nil, err
}
var runningPods []v1.Pod
for _, pod := range allPodList.Items {
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
// Since we only want the running Pods we will skip adding
// the failed/succeeded Pods to the returned list
continue
}
runningPods = append(runningPods, pod)
}
return runningPods, nil
}
// ListPods lists the pod for a given node name by querying the API server cache
func (p *podClientAPIWrapper) ListPods(nodeName string) (*v1.PodList, error) {
items, err := p.dataStore.ByIndex(NodeNameSpec, nodeName)
if err != nil {
return nil, err
}
podList := &v1.PodList{}
for _, item := range items {
podList.Items = append(podList.Items, *item.(*v1.Pod))
}
return podList, nil
}
// AnnotatePod annotates the pod with the provided key and value
func (p *podClientAPIWrapper) AnnotatePod(podNamespace string, podName string, uid types.UID,
key string, val string) error {
annotatePodRequestCallCount.WithLabelValues(key).Inc()
ctx := context.Background()
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Get the latest copy of the pod from cache
var pod *v1.Pod
var err error
if pod, err = p.GetPod(podNamespace, podName); err != nil {
return err
}
// Prevent conditions where the Pod with same namespace/name is
// re-created before the older pod is annotated with the key-val
if pod.UID != uid {
return fmt.Errorf("not annotating the Pod with UID %s as the annotation was "+
"intended for Pod with UID %s", pod.UID, uid)
}
newPod := pod.DeepCopy()
newPod.Annotations[key] = val
return p.client.Patch(ctx, newPod, client.MergeFrom(pod))
})
if err != nil {
annotatePodRequestErrCount.WithLabelValues(key).Inc()
}
return err
}
// GetPod returns the pod object using the client cache
func (p *podClientAPIWrapper) GetPod(namespace string, name string) (*v1.Pod, error) {
nsName := types.NamespacedName{
Namespace: namespace,
Name: name,
}.String()
obj, exists, err := p.dataStore.GetByKey(nsName)
if err != nil {
return nil, err
}
if !exists {
return nil, apierrors.NewNotFound(schema.GroupResource{}, nsName)
}
return obj.(*v1.Pod), nil
}
// GetPodFromAPIServer returns the pod details by querying the API Server directly
func (p *podClientAPIWrapper) GetPodFromAPIServer(ctx context.Context, namespace string, name string) (*v1.Pod, error) {
getPodFromAPIServeCallCount.Inc()
pod, err := p.coreV1.Pods(namespace).Get(ctx, name, metav1.GetOptions{
TypeMeta: metav1.TypeMeta{},
ResourceVersion: "",
})
if err != nil {
getPodFromAPIServeErrCount.Inc()
}
return pod, err
}