pkg/controller/common/statefulset/pod.go (69 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package statefulset import ( "context" "fmt" "strings" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil" ) // PodName returns the name of the pod with the given ordinal for this StatefulSet. func PodName(ssetName string, ordinal int32) string { return fmt.Sprintf("%s-%d", ssetName, ordinal) } // PodNames returns the names of the pods for this StatefulSet, according to the number of replicas. func PodNames(sset appsv1.StatefulSet) []string { names := make([]string, 0, GetReplicas(sset)) for i := int32(0); i < GetReplicas(sset); i++ { names = append(names, PodName(sset.Name, i)) } return names } // PodRevision returns the StatefulSet revision from this pod labels. func PodRevision(pod corev1.Pod) string { return pod.Labels[appsv1.StatefulSetRevisionLabel] } func GetActualPodsForStatefulSet(c k8s.Client, sset types.NamespacedName, labelName string) ([]corev1.Pod, error) { var pods corev1.PodList ns := client.InNamespace(sset.Namespace) matchLabels := client.MatchingLabels(map[string]string{ labelName: sset.Name, }) if err := c.List(context.Background(), &pods, matchLabels, ns); err != nil { return nil, err } return pods.Items, nil } // PodReconciliationDone returns true if actual existing pods match what is specified in the StatefulSetList. // It may return false if there are pods in the process of being: // - created (but not there in our resources cache) // - removed (but still there in our resources cache) // Status of the pods (running, error, etc.) is ignored. func PodReconciliationDone(ctx context.Context, c k8s.Client, statefulSet appsv1.StatefulSet, labelName string) (bool, string, error) { pendingCreations, pendingDeletions, err := PendingPodsForStatefulSet(c, statefulSet, labelName) if err != nil { return false, "", err } if len(pendingCreations) > 0 || len(pendingDeletions) > 0 { ulog.FromContext(ctx).V(1).Info( "Some pods still need to be created/deleted", "namespace", statefulSet.Namespace, "statefulset_name", statefulSet.Name, "pending_creations", pendingCreations, "pending_deletions", pendingDeletions, ) var reason strings.Builder if len(pendingCreations) > 0 { reason.WriteString(fmt.Sprintf(", creations: %s", pendingCreations)) } if len(pendingDeletions) > 0 { reason.WriteString(fmt.Sprintf(", deletions: %s", pendingDeletions)) } return false, reason.String(), nil } return true, "", nil } func PendingPodsForStatefulSet(c k8s.Client, statefulSet appsv1.StatefulSet, labelName string) ([]string, []string, error) { // check all expected pods are there: no more, no less actualPods, err := GetActualPodsForStatefulSet(c, k8s.ExtractNamespacedName(&statefulSet), labelName) if err != nil { return nil, nil, err } actualPodNames := k8s.PodNames(actualPods) expectedPodNames := PodNames(statefulSet) pendingCreations, pendingDeletions := stringsutil.Difference(expectedPodNames, actualPodNames) return pendingCreations, pendingDeletions, nil }