pkg/controller/elasticsearch/nodespec/statefulset.go (127 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package nodespec
import (
"context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/defaults"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/hash"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/keystore"
sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/network"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/services"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings"
es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
esvolume "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/volume"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
)
// HeadlessServiceName returns the name of the headless service for the given StatefulSet.
func HeadlessServiceName(ssetName string) string {
// just use the sset name
return ssetName
}
// HeadlessService returns a headless service for the given StatefulSet
func HeadlessService(es *esv1.Elasticsearch, ssetName string) corev1.Service {
nsn := k8s.ExtractNamespacedName(es)
ports := []corev1.ServicePort{
{
Name: es.Spec.HTTP.Protocol(),
Protocol: corev1.ProtocolTCP,
Port: network.HTTPPort,
},
}
if es.Spec.RemoteClusterServer.Enabled {
ports = append(ports,
corev1.ServicePort{
Name: services.RemoteClusterServicePortName,
Protocol: corev1.ProtocolTCP,
Port: network.RemoteClusterPort,
},
)
}
return corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: nsn.Namespace,
Name: HeadlessServiceName(ssetName),
Labels: label.NewStatefulSetLabels(nsn, ssetName),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: corev1.ClusterIPNone,
Selector: label.NewStatefulSetLabels(nsn, ssetName),
Ports: ports,
// allow nodes to discover themselves via DNS while they are booting up ie. are not ready yet
PublishNotReadyAddresses: true,
},
}
}
func BuildStatefulSet(
ctx context.Context,
client k8s.Client,
es esv1.Elasticsearch,
nodeSet esv1.NodeSet,
cfg settings.CanonicalConfig,
keystoreResources *keystore.Resources,
existingStatefulSets es_sset.StatefulSetList,
setDefaultSecurityContext bool,
policyConfig PolicyConfig,
) (appsv1.StatefulSet, error) {
statefulSetName := esv1.StatefulSet(es.Name, nodeSet.Name)
// ssetSelector is used to match the sset pods
ssetSelector := label.NewStatefulSetLabels(k8s.ExtractNamespacedName(&es), statefulSetName)
// add default PVCs to the node spec only if no user defined PVCs exist
nodeSet.VolumeClaimTemplates = defaults.AppendDefaultPVCs(
nodeSet.VolumeClaimTemplates,
nodeSet.PodTemplate.Spec,
esvolume.DefaultVolumeClaimTemplates...,
)
// build pod template
podTemplate, err := BuildPodTemplateSpec(ctx, client, es, nodeSet, cfg, keystoreResources, setDefaultSecurityContext, policyConfig)
if err != nil {
return appsv1.StatefulSet{}, err
}
// build sset labels on top of the selector
// TODO: inherit user-provided labels and annotations from the CRD?
ssetLabels := make(map[string]string)
for k, v := range ssetSelector {
ssetLabels[k] = v
}
// maybe inherit volumeClaimTemplates ownerRefs from the existing StatefulSet
var existingClaims []corev1.PersistentVolumeClaim
if existingSset, exists := existingStatefulSets.GetByName(statefulSetName); exists {
existingClaims = existingSset.Spec.VolumeClaimTemplates
}
claims := preserveExistingVolumeClaimsOwnerRefs(nodeSet.VolumeClaimTemplates, existingClaims)
sset := appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: es.Namespace,
Name: statefulSetName,
Labels: ssetLabels,
},
Spec: appsv1.StatefulSetSpec{
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
},
// we don't care much about pods creation ordering, and manage deletion ordering ourselves,
// so we're fine with the StatefulSet controller spawning all pods in parallel
PodManagementPolicy: appsv1.ParallelPodManagement,
RevisionHistoryLimit: es.Spec.RevisionHistoryLimit,
// build a headless service per StatefulSet, matching the StatefulSet labels
ServiceName: HeadlessServiceName(statefulSetName),
Selector: &metav1.LabelSelector{
MatchLabels: ssetSelector,
},
Replicas: &nodeSet.Count,
VolumeClaimTemplates: claims,
Template: podTemplate,
},
}
// store a hash of the sset resource in its labels for comparison purposes
sset.Labels = hash.SetTemplateHashLabel(sset.Labels, sset.Spec)
return sset, nil
}
func preserveExistingVolumeClaimsOwnerRefs(
persistentVolumeClaims []corev1.PersistentVolumeClaim,
existingClaims []corev1.PersistentVolumeClaim,
) []corev1.PersistentVolumeClaim {
// before https://github.com/elastic/cloud-on-k8s/pull/4050, we used to set an ownerRef into all claims
// now keep existing ownerReferences for backwards compatibility but don't add new ones
claims := make([]corev1.PersistentVolumeClaim, 0, len(persistentVolumeClaims))
for _, claim := range persistentVolumeClaims {
if existingClaim := sset.GetClaim(existingClaims, claim.Name); existingClaim != nil {
// This claim already exists in the actual resource. Since the volumeClaimTemplates section of
// a StatefulSet is immutable, any modification to it will be rejected in the StatefulSet update.
// This is fine and we let it error-out. It is caught in a user-friendly way by the validating webhook.
//
// However, there is one case where the claim we build may differ from the existing one, that was
// built with a prior version of the operator. If the Elasticsearch apiVersion has changed,
// from eg. `v1beta1` to `v1`, we want to keep the existing ownerRef (pointing to eg. a `v1beta1` owner).
// Having ownerReferences with a "deprecated" apiVersion is fine, and does not prevent resources
// from being garbage collected as expected.
claim.OwnerReferences = existingClaim.OwnerReferences
}
claims = append(claims, claim)
}
return claims
}
// UpdateReplicas updates the given StatefulSet with the given replicas,
// and modifies the template hash label accordingly.
func UpdateReplicas(statefulSet *appsv1.StatefulSet, replicas *int32) {
statefulSet.Spec.Replicas = replicas
statefulSet.Labels = hash.SetTemplateHashLabel(statefulSet.Labels, statefulSet.Spec)
}