pkg/controller/elasticsearch/certificates/transport/reconcile.go (195 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package transport
import (
"bytes"
"context"
"reflect"
"strings"
"time"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/annotation"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/certificates"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/maps"
)
// ReconcileTransportCertificatesSecrets reconciles the secret containing transport certificates for all nodes in the
// cluster.
// Secrets which are not used anymore are deleted as part of the downscale process.
func ReconcileTransportCertificatesSecrets(
ctx context.Context,
c k8s.Client,
ca *certificates.CA,
additionalCAs []byte,
es esv1.Elasticsearch,
rotationParams certificates.RotationParams,
) *reconciler.Results {
results := &reconciler.Results{}
// We must create transport certificates for the following StatefulSets:
// - the ones that still exist, even if they have been removed from the Spec
// - the ones that do not exist yet, but will be created in a later step of the reconciliation
actualStatefulSets, err := sset.RetrieveActualStatefulSets(c, k8s.ExtractNamespacedName(&es))
if err != nil {
return results.WithError(err)
}
ssets := actualStatefulSets.Names()
for _, nodeSet := range es.Spec.NodeSets {
ssets.Add(esv1.StatefulSet(es.Name, nodeSet.Name))
}
for ssetName := range ssets {
results.WithResults(reconcileNodeSetTransportCertificatesSecrets(ctx, c, ca, additionalCAs, es, ssetName, rotationParams))
}
return results
}
// DeleteStatefulSetTransportCertificate removes the Secret which contains the transport certificates of a given Statefulset.
func DeleteStatefulSetTransportCertificate(ctx context.Context, client k8s.Client, namespace string, ssetName string) error {
secret := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: esv1.StatefulSetTransportCertificatesSecret(ssetName),
},
}
return client.Delete(ctx, &secret)
}
// DeleteLegacyTransportCertificate ensures that the former Secret which used to contain the transport certificates is deleted.
func DeleteLegacyTransportCertificate(ctx context.Context, client k8s.Client, es esv1.Elasticsearch) error {
nsn := types.NamespacedName{Namespace: es.Namespace, Name: esv1.LegacyTransportCertsSecretSuffix(es.Name)}
return k8s.DeleteSecretIfExists(ctx, client, nsn)
}
const disabledMarker = "transport.certs.disabled"
// reconcileNodeSetTransportCertificatesSecrets reconciles the secret which contains the transport certificates for
// a given StatefulSet.
func reconcileNodeSetTransportCertificatesSecrets(
ctx context.Context,
c k8s.Client,
ca *certificates.CA,
additionalCAs []byte,
es esv1.Elasticsearch,
ssetName string,
rotationParams certificates.RotationParams,
) *reconciler.Results {
results := &reconciler.Results{}
log := ulog.FromContext(ctx)
// List all the existing Pods in the nodeSet
var pods corev1.PodList
matchLabels := label.NewLabelSelectorForStatefulSetName(es.Name, ssetName)
ns := client.InNamespace(es.Namespace)
if err := c.List(ctx, &pods, matchLabels, ns); err != nil {
return results.WithError(errors.WithStack(err))
}
secret, err := ensureTransportCertificatesSecretExists(ctx, c, es, ssetName)
if err != nil {
return results.WithError(err)
}
// defensive copy of the current secret so we can check whether we need to update later on
currentTransportCertificatesSecret := secret.DeepCopy()
for _, pod := range pods.Items {
if pod.Status.PodIP == "" {
log.Info("Skipping pod because it has no IP yet", "namespace", pod.Namespace, "pod_name", pod.Name)
continue
}
if _, disabled := pod.Annotations[esv1.TransportCertDisabledAnnotationName]; disabled {
delete(secret.Data, PodCertFileName(pod.Name))
delete(secret.Data, PodKeyFileName(pod.Name))
continue
}
if err := ensureTransportCertificatesSecretContentsForPod(
ctx, es, secret, pod, ca, rotationParams,
); err != nil {
return results.WithError(err)
}
certCommonName := buildCertificateCommonName(pod, es)
cert := extractTransportCert(ctx, *secret, pod, certCommonName)
if cert == nil {
return results.WithError(errors.New("no certificate found for pod"))
}
// handle cert expiry via requeue
results.WithReconciliationState(
reconciler.
RequeueAfter(certificates.ShouldRotateIn(time.Now(), cert.NotAfter, rotationParams.RotateBefore)).
ReconciliationComplete(),
)
}
// remove certificates and keys for deleted pods
podsByName := k8s.PodsByName(pods.Items)
keysToPrune := make([]string, 0)
for secretDataKey := range secret.Data {
if secretDataKey == certificates.CAFileName {
// never remove the CA file
continue
}
// get the pod name from the secret key name (the first segment before the ".")
podNameForKey := strings.SplitN(secretDataKey, ".", 2)[0]
if _, ok := podsByName[podNameForKey]; !ok {
// pod no longer exists, so the element is safe to delete.
keysToPrune = append(keysToPrune, secretDataKey)
}
}
if len(keysToPrune) > 0 {
log.Info("Pruning keys from certificates secret", "namespace", es.Namespace, "secret_name", secret.Name, "keys", keysToPrune)
for _, keyToRemove := range keysToPrune {
delete(secret.Data, keyToRemove)
}
}
if es.Spec.Transport.TLS.SelfSignedEnabled() {
delete(secret.Data, disabledMarker)
} else {
// add a marker but leave all the old certs that might exist in the secret in place to ease the transition
// to the disabled state.
secret.Data[disabledMarker] = []byte("true") // contents is irrelevant
}
mayBeUpdateCAFile(secret, ca, additionalCAs)
if !reflect.DeepEqual(secret, currentTransportCertificatesSecret) {
if err := c.Update(ctx, secret); err != nil {
return results.WithError(err)
}
for _, pod := range pods.Items {
annotation.MarkPodAsUpdated(ctx, c, pod)
}
}
return results
}
func mayBeUpdateCAFile(secret *corev1.Secret, ca *certificates.CA, additionalCAs []byte) {
var cas [][]byte
// if the secret contains only the marker file (and maybe an old CA) transport certs are disabled
// and no pod uses them anymore => we don't need the CA
_, transportCertsDisabled := secret.Data[disabledMarker]
secretContainsMarkerAndCAFile := len(secret.Data) <= 2 && transportCertsDisabled
if !secretContainsMarkerAndCAFile {
cas = append(cas, certificates.EncodePEMCert(ca.Cert.Raw))
}
cas = append(cas, additionalCAs)
caBytes := bytes.Join(cas, nil)
if len(caBytes) == 0 {
// no CAs delete and return
delete(secret.Data, certificates.CAFileName)
return
}
// compare with current trusted CA certs.
if !bytes.Equal(caBytes, secret.Data[certificates.CAFileName]) {
secret.Data[certificates.CAFileName] = caBytes
}
}
// ensureTransportCertificatesSecretExists ensures the existence and labels of the Secret that at a later point
// in time will contain the transport certificates for a nodeSet.
func ensureTransportCertificatesSecretExists(
ctx context.Context,
c k8s.Client,
es esv1.Elasticsearch,
ssetName string,
) (*corev1.Secret, error) {
expected := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: es.Namespace,
Name: esv1.StatefulSetTransportCertificatesSecret(ssetName),
Labels: map[string]string{
// a label showing which es these certificates belongs to
label.ClusterNameLabelName: es.Name,
// label indicating to which StatefulSet these certificates belong
label.StatefulSetNameLabelName: ssetName,
},
},
}
// reconcile the secret resource:
// - create it if it doesn't exist
// - update labels & annotations if they don't match
// - do not touch the existing data as it probably already contains certificates - it will be reconciled later on
var reconciled corev1.Secret
if err := reconciler.ReconcileResource(reconciler.Params{
Context: ctx,
Client: c,
Owner: &es,
Expected: &expected,
Reconciled: &reconciled,
NeedsUpdate: func() bool {
return !maps.IsSubset(expected.Labels, reconciled.Labels) ||
!maps.IsSubset(expected.Annotations, reconciled.Annotations)
},
UpdateReconciled: func() {
reconciled.Labels = maps.Merge(reconciled.Labels, expected.Labels)
reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations)
},
}); err != nil {
return nil, err
}
// a placeholder secret may have nil entries, create them if needed
if reconciled.Data == nil {
reconciled.Data = make(map[string][]byte)
}
return &reconciled, nil
}