pkg/utils/k8s/k8sutils.go (226 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package k8s
import (
"context"
"fmt"
"net"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
netutil "github.com/elastic/cloud-on-k8s/v3/pkg/utils/net"
)
// DeepCopyObject creates a deep copy of a client.Object.
// This is to get around the limitation of the DeepCopyObject method which returns a runtime.Object.
func DeepCopyObject(obj client.Object) client.Object {
if obj == nil {
return nil
}
if newObj := obj.DeepCopyObject(); newObj != nil {
return newObj.(client.Object) //nolint:forcetypeassert
}
return nil
}
// ToObjectMeta returns an ObjectMeta based on the given NamespacedName.
func ToObjectMeta(namespacedName types.NamespacedName) metav1.ObjectMeta {
return metav1.ObjectMeta{
Namespace: namespacedName.Namespace,
Name: namespacedName.Name,
}
}
// ExtractNamespacedName returns an NamespacedName based on the given Object.
func ExtractNamespacedName(object metav1.Object) types.NamespacedName {
return types.NamespacedName{
Namespace: object.GetNamespace(),
Name: object.GetName(),
}
}
// ObjectExists returns true if the object pointed by ref exists.
// typedReceiver acts as a generic object but must be of the desired object underlying type.
func ObjectExists(c Client, ref types.NamespacedName, typedReceiver client.Object) (bool, error) {
err := c.Get(context.Background(), ref, typedReceiver)
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
// IsPodReady checks if both conditions ContainersReady and PodReady of a Pod are true.
func IsPodReady(pod corev1.Pod) bool {
conditionsTrue := 0
for _, cond := range pod.Status.Conditions {
if cond.Status == corev1.ConditionTrue && (cond.Type == corev1.ContainersReady || cond.Type == corev1.PodReady) {
conditionsTrue++
}
}
return conditionsTrue == 2
}
// IsPodRunning returns true if the Pod is in phase running and not terminating.
func IsPodRunning(pod corev1.Pod) bool {
return pod.DeletionTimestamp.IsZero() && pod.Status.Phase == corev1.PodRunning
}
// TerminatingPods filters pods for Pods that are in the process of (graceful) termination.
func TerminatingPods(pods []corev1.Pod) []corev1.Pod {
var terminating []corev1.Pod //nolint:prealloc
for _, p := range pods {
if p.DeletionTimestamp.IsZero() {
continue
}
terminating = append(terminating, p)
}
return terminating
}
// RunningPods filters pods for Pods that are running (and not terminating).
func RunningPods(pods []corev1.Pod) []corev1.Pod {
var running []corev1.Pod
for _, p := range pods {
if IsPodRunning(p) {
running = append(running, p)
}
}
return running
}
// PodsByName returns a map of pod names to pods
func PodsByName(pods []corev1.Pod) map[string]corev1.Pod {
podMap := make(map[string]corev1.Pod, len(pods))
for _, pod := range pods {
podMap[pod.Name] = pod
}
return podMap
}
// PodNames returns the names of the given pods.
func PodNames(pods []corev1.Pod) []string {
names := make([]string, 0, len(pods))
for _, pod := range pods {
names = append(names, pod.Name)
}
return names
}
// GetServiceDNSName returns the fully qualified DNS name for a service along with any external names provided by ingresses.
func GetServiceDNSName(svc corev1.Service) []string {
names := []string{
fmt.Sprintf("%s.%s.svc", svc.Name, svc.Namespace),
fmt.Sprintf("%s.%s", svc.Name, svc.Namespace),
}
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.Hostname != "" {
names = append(names, ingress.Hostname)
}
}
}
return names
}
func GetServiceIPAddresses(svc corev1.Service) []net.IP {
var ipAddrs []net.IP
if len(svc.Spec.ExternalIPs) > 0 {
ipAddrs = make([]net.IP, len(svc.Spec.ExternalIPs))
for i, externalIP := range svc.Spec.ExternalIPs {
ipAddrs[i] = netutil.IPToRFCForm(net.ParseIP(externalIP))
}
}
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
for _, ingress := range svc.Status.LoadBalancer.Ingress {
if ingress.IP != "" {
ipAddrs = append(ipAddrs, netutil.IPToRFCForm(net.ParseIP(ingress.IP))) //nolint:makezero
}
}
}
return ipAddrs
}
// MaybeEmitErrorEvent emits an event if the error is report-worthy
func MaybeEmitErrorEvent(r record.EventRecorder, err error, obj runtime.Object, reason, message string, args ...interface{}) {
// ignore nil errors and conflict issues
if err == nil || apierrors.IsConflict(err) {
return
}
r.Eventf(obj, corev1.EventTypeWarning, reason, message, args...)
}
// GetSecretEntry returns the value of the secret data for the given key, or nil.
func GetSecretEntry(secret corev1.Secret, key string) []byte {
if secret.Data == nil {
return nil
}
content, exists := secret.Data[key]
if !exists {
return nil
}
return content
}
// GetSecretEntriesCount returns the number of matching keys found in secret.
func GetSecretEntriesCount(secret corev1.Secret, keys ...string) int {
if secret.Data == nil {
return 0
}
var hits int
for _, k := range keys {
if _, exists := secret.Data[k]; exists {
hits++
}
}
return hits
}
// DeleteSecretMatching deletes the Secret matching the provided selectors.
func DeleteSecretMatching(ctx context.Context, c Client, opts ...client.ListOption) error {
var secrets corev1.SecretList
if err := c.List(ctx, &secrets, opts...); err != nil {
return err
}
for _, s := range secrets.Items {
secret := s
if err := c.Delete(ctx, &secret); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
return nil
}
// DeleteSecretIfExists deletes the secret identified by key if exists.
func DeleteSecretIfExists(ctx context.Context, c Client, key types.NamespacedName) error {
var secret corev1.Secret
err := c.Get(ctx, key, &secret)
if err != nil && apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}
err = c.Delete(ctx, &secret)
if err != nil && apierrors.IsNotFound(err) {
return nil
}
return err
}
// DeleteResourceIfExists deletes the provided resource if exists.
func DeleteResourceIfExists(ctx context.Context, c Client, obj client.Object) error {
key := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
if err := c.Get(ctx, key, obj); err != nil {
if apierrors.IsNotFound(err) {
// Resource does not exist.
return nil
}
return err
}
if err := c.Delete(ctx, obj); err != nil && !apierrors.IsNotFound(err) {
return err
}
return nil
}
// PodsMatchingLabels returns Pods from the given namespace matching the given labels.
func PodsMatchingLabels(c Client, namespace string, labels map[string]string) ([]corev1.Pod, error) {
var pods corev1.PodList
if err := c.List(context.Background(), &pods, client.InNamespace(namespace), client.MatchingLabels(labels)); err != nil {
return nil, err
}
return pods.Items, nil
}
func indexOfCtrlRef(owners []metav1.OwnerReference) int {
for index, r := range owners {
if r.Controller != nil && *r.Controller {
return index
}
}
return -1
}
type StorageComparison struct {
Increase bool
Decrease bool
}
// CompareStorageRequests compares storage requests in the given resource requirements.
// It returns a zero-ed StorageComparison in case one of the requests is zero (value not set: comparison not possible).
func CompareStorageRequests(initial corev1.VolumeResourceRequirements, updated corev1.VolumeResourceRequirements) StorageComparison {
initialSize := initial.Requests.Storage()
updatedSize := updated.Requests.Storage()
if initialSize.IsZero() || updatedSize.IsZero() {
return StorageComparison{}
}
switch updatedSize.Cmp(*initialSize) {
case -1: // decrease
return StorageComparison{Decrease: true}
case 1: // increase
return StorageComparison{Increase: true}
default: // same size
return StorageComparison{}
}
}