pkg/controller/common/service_control.go (122 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package common
import (
"context"
"reflect"
"go.elastic.co/apm/v2"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/compare"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/maps"
)
func ReconcileService(
ctx context.Context,
c k8s.Client,
expected *corev1.Service,
owner client.Object,
) (*corev1.Service, error) {
span, _ := apm.StartSpan(ctx, "reconcile_service", tracing.SpanTypeApp)
defer span.End()
reconciled := &corev1.Service{}
err := reconciler.ReconcileResource(reconciler.Params{
Context: ctx,
Client: c,
Owner: owner,
Expected: expected,
Reconciled: reconciled,
NeedsRecreate: func() bool {
return needsRecreate(expected, reconciled)
},
NeedsUpdate: func() bool {
return needsUpdate(expected, reconciled)
},
UpdateReconciled: func() {
reconciled.Annotations = expected.Annotations
reconciled.Labels = expected.Labels
reconciled.Spec = expected.Spec
},
})
return reconciled, err
}
func needsRecreate(expected, reconciled *corev1.Service) bool {
applyServerSideValues(expected, reconciled)
// IPFamilies is immutable
if expected.Spec.IPFamilies != nil {
if len(expected.Spec.IPFamilies) != len(reconciled.Spec.IPFamilies) {
return true
}
for i := 0; i < len(expected.Spec.IPFamilies); i++ {
if expected.Spec.IPFamilies[i] != reconciled.Spec.IPFamilies[i] {
return true
}
}
}
// ClusterIP is immutable
if expected.Spec.ClusterIP != reconciled.Spec.ClusterIP {
return true
}
// LoadBalancerClass is immutable once set if target type is load balancer
if expected.Spec.Type == corev1.ServiceTypeLoadBalancer && !reflect.DeepEqual(expected.Spec.LoadBalancerClass, reconciled.Spec.LoadBalancerClass) {
return true
}
return false
}
func needsUpdate(expected *corev1.Service, reconciled *corev1.Service) bool {
applyServerSideValues(expected, reconciled)
// if the specs, labels, or annotations differ, the object should be updated
return !(reflect.DeepEqual(expected.Spec, reconciled.Spec) &&
compare.LabelsAndAnnotationsAreEqual(expected.ObjectMeta, reconciled.ObjectMeta))
}
// applyServerSideValues applies any default that may have been set from the reconciled version.
func applyServerSideValues(expected, reconciled *corev1.Service) {
// skip if the service type changes from something different to the default ClusterIP value
if reconciled.Spec.Type != corev1.ServiceTypeClusterIP && expected.Spec.Type != reconciled.Spec.Type {
return
}
// Type may be defaulted by the api server
if expected.Spec.Type == "" {
expected.Spec.Type = reconciled.Spec.Type
}
// ClusterIPs might not exist in the expected service,
// but might have been set after creation by k8s on the actual resource.
// In such case, we want to use these values for comparison.
// But only if we are not changing the type of service and the api server has assigned an IP
if expected.Spec.Type == reconciled.Spec.Type {
if expected.Spec.ClusterIP == "" {
expected.Spec.ClusterIP = reconciled.Spec.ClusterIP
}
if len(expected.Spec.ClusterIPs) == 0 {
expected.Spec.ClusterIPs = reconciled.Spec.ClusterIPs
}
}
// SessionAffinity may be defaulted by the api server
if expected.Spec.SessionAffinity == "" {
expected.Spec.SessionAffinity = reconciled.Spec.SessionAffinity
}
// same for the target port and node port
if len(expected.Spec.Ports) == len(reconciled.Spec.Ports) {
for i := range expected.Spec.Ports {
if expected.Spec.Ports[i].TargetPort.IntValue() == 0 {
expected.Spec.Ports[i].TargetPort = reconciled.Spec.Ports[i].TargetPort
}
// check if NodePort makes sense for this service type
if hasNodePort(expected.Spec.Type) && expected.Spec.Ports[i].NodePort == 0 {
expected.Spec.Ports[i].NodePort = reconciled.Spec.Ports[i].NodePort
}
}
}
if expected.Spec.HealthCheckNodePort == 0 {
expected.Spec.HealthCheckNodePort = reconciled.Spec.HealthCheckNodePort
}
expected.Annotations = maps.MergePreservingExistingKeys(expected.Annotations, reconciled.Annotations)
expected.Labels = maps.MergePreservingExistingKeys(expected.Labels, reconciled.Labels)
// IPFamily is immutable and cannot be modified so we should retain the existing value from the server if there's no explicit override.
if expected.Spec.IPFamilies == nil {
expected.Spec.IPFamilies = reconciled.Spec.IPFamilies
}
// IPFamilyPolicy is immutable and cannot be modified so we should retain the existing value from the server if there's no explicit override.
if expected.Spec.IPFamilyPolicy == nil {
expected.Spec.IPFamilyPolicy = reconciled.Spec.IPFamilyPolicy
}
// InternalTrafficPolicy may be defaulted by the api server starting K8S v1.22
if expected.Spec.InternalTrafficPolicy == nil {
expected.Spec.InternalTrafficPolicy = reconciled.Spec.InternalTrafficPolicy
}
if expected.Spec.ExternalTrafficPolicy == "" {
expected.Spec.ExternalTrafficPolicy = reconciled.Spec.ExternalTrafficPolicy
}
// LoadBalancerClass may be defaulted by the API server starting K8s v.1.24
if expected.Spec.Type == corev1.ServiceTypeLoadBalancer && expected.Spec.LoadBalancerClass == nil {
expected.Spec.LoadBalancerClass = reconciled.Spec.LoadBalancerClass
}
if expected.Spec.AllocateLoadBalancerNodePorts == nil {
expected.Spec.AllocateLoadBalancerNodePorts = reconciled.Spec.AllocateLoadBalancerNodePorts
}
}
// hasNodePort returns for a given service type, if the service ports have a NodePort or not.
func hasNodePort(svcType corev1.ServiceType) bool {
return svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer
}